diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/physical/StreamPath.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/physical/StreamPath.java new file mode 100644 index 000000000..16966484a --- /dev/null +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/physical/StreamPath.java @@ -0,0 +1,37 @@ +package io.pixelsdb.pixels.common.physical; + +import static java.util.Objects.requireNonNull; + +public class StreamPath +{ + private String host; + private int port; + public boolean valid = false; + + public StreamPath(String path) + { + requireNonNull(path); + if (path.contains(":///")) + { + path = path.substring(path.indexOf(":///") + 4); + } + int colon = path.indexOf(':'); + if (colon > 0) + { + host = path.substring(0, colon); + port = Integer.parseInt(path.substring(colon + 1)); + this.valid = true; + } + } + + public String getHostName() + { + return host; + } + + public int getPort() + { + return port; + } + +} diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java index a7ce4b42b..4f6decf3f 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java @@ -32,6 +32,7 @@ public final class Constants public static final int S3_BUFFER_SIZE = 8 * 1024 * 1024; public static final int REDIS_BUFFER_SIZE = 8 * 1024 * 1024; public static final int GCS_BUFFER_SIZE = 8 * 1024 * 1024; + public static final int STREAM_BUFFER_SIZE = 8 * 1024 * 1024; public static final int MIN_REPEAT = 3; public static final int MAX_SCOPE = 512; diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/HttpServer.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/HttpServer.java index 216f12540..3c3357a1c 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/HttpServer.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/HttpServer.java @@ -40,16 +40,20 @@ public final class HttpServer { final HttpServerInitializer initializer; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Channel channel; public HttpServer(HttpServerHandler handler) throws CertificateException, SSLException { this.initializer = new HttpServerInitializer(HttpServerUtil.buildSslContext(), handler); + handler.setServerCloser(this::close); } public void serve(int PORT) throws InterruptedException { - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - EventLoopGroup workerGroup = new NioEventLoopGroup(); + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); @@ -59,13 +63,28 @@ public void serve(int PORT) throws InterruptedException .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(this.initializer); - Channel ch = b.bind(PORT).sync().channel(); - - ch.closeFuture().sync(); + channel = b.bind(PORT).sync().channel(); + channel.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } + + public void close() + { + if (channel != null) + { + channel.close(); + } + if (bossGroup != null) + { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) + { + workerGroup.shutdownGracefully(); + } + } } diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/HttpServerHandler.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/HttpServerHandler.java index 4536b5d09..fbcbebc8a 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/HttpServerHandler.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/HttpServerHandler.java @@ -43,6 +43,7 @@ @ChannelHandler.Sharable public class HttpServerHandler extends SimpleChannelInboundHandler { + protected Runnable serverCloser; @Override public void channelReadComplete(ChannelHandlerContext ctx) @@ -100,4 +101,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) ChannelFuture f = ctx.writeAndFlush(response); f.addListener(ChannelFutureListener.CLOSE); } + + public void setServerCloser(Runnable serverCloser) { + this.serverCloser = serverCloser; + } } diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsReaderStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsReaderStreamImpl.java index 581da9b4f..98f41029f 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsReaderStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsReaderStreamImpl.java @@ -84,6 +84,7 @@ public class PixelsReaderStreamImpl implements PixelsReader // In partitioned mode, we use byteBufBlockingMap to map hash value to corresponding ByteBuf private final BlockingMap byteBufBlockingMap; private final boolean partitioned; + private final int httpPort; private final AtomicReference numPartitionsReceived = new AtomicReference<>(0); private final List recordReaders; @@ -98,7 +99,7 @@ public class PixelsReaderStreamImpl implements PixelsReader public PixelsReaderStreamImpl(String endpoint) throws Exception { - this(endpoint, false, -1); + this(endpoint, false, -2); } public PixelsReaderStreamImpl(int port) throws Exception @@ -113,7 +114,7 @@ public PixelsReaderStreamImpl(String endpoint, boolean partitioned, int numParti this.streamHeader = null; URI uri = new URI(endpoint); String IP = uri.getHost(); - int httpPort = uri.getPort(); + this.httpPort = uri.getPort(); logger.debug("In Pixels stream reader constructor, IP: " + IP + ", port: " + httpPort + ", partitioned: " + partitioned + ", numPartitions: " + numPartitions); if (!Objects.equals(IP, "127.0.0.1") && !Objects.equals(IP, "localhost")) @@ -151,13 +152,6 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) ", partition ID header: " + req.headers().get("X-Partition-Id") + ", HTTP request object body total length: " + req.content().readableBytes()); - // schema packet: only 1 packet expected, so close the connection immediately - // partitioned mode: close the connection if all partitions received - // else (non-partitioned mode, data packet): close connection if empty packet received - boolean needCloseParentChannel = partitionId == PixelsWriterStreamImpl.PARTITION_ID_SCHEMA_WRITER || - (partitioned && numPartitionsReceived.get() == numPartitions) || - (Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()) && - req.content().readableBytes() == 0); ByteBuf byteBuf = req.content(); try { @@ -178,27 +172,29 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) catch (IOException e) { logger.error("Invalid stream header values: ", e); - sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel); + sendResponseAndClose(ctx, req, BAD_REQUEST, false); return; } } else if (partitioned) { // In partitioned mode, every packet brings a streamHeader to prevent errors from possibly - // out-of-order packet arrivals, so we need to parse it, but do not need the return value - // (except for the first incoming packet processed above). + // out-of-order packet arrivals, so we need to parse it, but do not need the return value + // (except for the first incoming packet processed above). + // XXX: Now we have each worker pass the schema in a separate packet, so this is no longer + // necessary. We can remove this block of code in PixelsWriterStreamImpl. parseStreamHeader(byteBuf); } } catch (InvalidProtocolBufferException | IndexOutOfBoundsException e) { logger.error("Malformed or corrupted stream header", e); - sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel); + sendResponseAndClose(ctx, req, BAD_REQUEST, false); return; } // We only need to put the byteBuf into the blocking queue to pass it to the recordReader, if the - // client is a data writer rather than a schema writer. In the latter case, + // packet is a data packet rather than a schema packet. Because in the latter case, // the schema packet has been processed when parsing the stream header above. if (partitionId != PixelsWriterStreamImpl.PARTITION_ID_SCHEMA_WRITER) { @@ -209,7 +205,7 @@ else if (partitioned) if (partitionId < 0 || partitionId >= numPartitions) { logger.warn("Client sent invalid partitionId value: " + partitionId); - sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel); + sendResponseAndClose(ctx, req, BAD_REQUEST, false); return; } byteBufBlockingMap.put(partitionId, byteBuf); @@ -222,6 +218,13 @@ else if (partitioned) } } + // schema reader: only 1 packet expected, so close the connection immediately + // partitioned mode: close the connection if all partitions received + // else (non-partitioned mode, data packet): close connection if empty packet received + boolean needCloseParentChannel = numPartitions == PixelsWriterStreamImpl.PARTITION_ID_SCHEMA_WRITER || + (partitioned && numPartitionsReceived.get() == numPartitions) || + (numPartitions == -1 && Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()) && + req.content().readableBytes() == 0); sendResponseAndClose(ctx, req, HttpResponseStatus.OK, needCloseParentChannel); } @@ -437,15 +440,7 @@ public int getRowGroupNum() @Override public boolean isPartitioned() { - try - { - streamHeaderLatch.await(); - } - catch (InterruptedException e) - { - logger.error("Interrupted while waiting for stream header", e); - } - return this.streamHeader.hasPartitioned() && this.streamHeader.getPartitioned(); + return partitioned; } /** @@ -523,6 +518,7 @@ public PixelsProto.Footer getFooter() public void close() throws IOException { + logger.debug("Closing PixelsReaderStreamImpl"); new Thread(() -> { // Conditions for closing: // 1. streamHeaderLatch.await() to ensure that the stream header has been received @@ -539,11 +535,11 @@ public void close() try { - if (!this.httpServerFuture.isDone()) this.httpServerFuture.get(5, TimeUnit.SECONDS); + if (!this.httpServerFuture.isDone()) this.httpServerFuture.get(300, TimeUnit.SECONDS); } catch (TimeoutException e) { - logger.warn("In close(), HTTP server did not shut down in 5 seconds, doing forceful shutdown"); + logger.warn("In close(), HTTP server on port " + httpPort + " did not shut down in 300 seconds, doing forceful shutdown"); this.httpServerFuture.cancel(true); } catch (InterruptedException | ExecutionException e) diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java index 127aeb1f0..91cdcf2f5 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java @@ -512,6 +512,7 @@ public void addRowBatch(VectorizedRowBatch rowBatch, int hashValue) throws IOExc currHashValue = hashValue; hashValueIsSet = true; curRowGroupDataLength = 0; + if (rowBatch == null) return; curRowGroupNumOfRows += rowBatch.size; writeColumnVectors(rowBatch.cols, rowBatch.size); } @@ -550,8 +551,9 @@ public void close() { try { - if (curRowGroupNumOfRows != 0) + if (partitioned || curRowGroupNumOfRows != 0) { + // In partitioned mode, even an empty row group has to be sent to the server. writeRowGroup(); } // If the outgoing stream is empty (addRowBatch() and thus writeRowGroup() never called), we artificially @@ -609,7 +611,9 @@ else if (isFirstRowGroup) private void writeRowGroup() throws IOException { - if (isFirstRowGroup || partitioned) + // XXX: Now that we have each worker pass the schema in a separate packet in partitioned mode, it is no longer + // necessary to add a stream header to every packet. We can modify this block of code. + if (isFirstRowGroup || partitioned) // if (isFirstRowGroup) { writeStreamHeader(); isFirstRowGroup = false; @@ -769,7 +773,8 @@ private void writeRowGroup() throws IOException uri = URI.create(fileNameToUri(fileName)); } String reqUri = partitioned ? uris.get(currHashValue).toString() : uri.toString(); - logger.debug("Sending row group with length: " + byteBuf.writerIndex() + " to endpoint: " + reqUri); + logger.debug("Sending row group to endpoint: " + reqUri + ", length: " + byteBuf.writerIndex() + + ", partitionId: " + partitionId); Request req = httpClient.preparePost(reqUri) .setBody(byteBuf.nioBuffer()) .addHeader("X-Partition-Id", String.valueOf(partitionId)) @@ -786,8 +791,8 @@ private void writeRowGroup() throws IOException try { outstandingHTTPRequestSemaphore.acquire(); - int maxAttempts = 30000; - long backoffMillis = 10; + int maxAttempts = 3000; + long backoffMillis = 100; int attempt = 0; boolean success = false; diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderStreamImpl.java index 0588cbd47..4d1dab6f4 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderStreamImpl.java @@ -37,6 +37,8 @@ import java.util.*; import java.util.concurrent.BlockingQueue; +import static com.google.common.base.Preconditions.checkArgument; + /** * PixelsRecordReaderStreamImpl is the variant of {@link PixelsRecordReaderImpl} for streaming mode. *

@@ -153,6 +155,12 @@ public PixelsRecordReaderStreamImpl(boolean partitioned, */ public void lateInitialization(PixelsStreamProto.StreamHeader streamHeader) throws IOException { + if (this.streamHeader != null) + { + checkArgument(this.streamHeader == streamHeader, + "streamHeader used for lateInitialization() is not the same as the one in the RecordReader."); + return; + } this.streamHeader = streamHeader; checkBeforeRead(); } @@ -426,6 +434,7 @@ public int prepareBatch(int batchSize) */ private VectorizedRowBatch createEmptyEOFRowBatch(int size) { + logger.debug("In createEmptyEOFRowBatch(), size = " + size); TypeDescription resultSchema = TypeDescription.createSchema(new ArrayList<>()); VectorizedRowBatch resultRowBatch = resultSchema.createRowBatch(0); resultRowBatch.projectionSize = 0; @@ -494,6 +503,14 @@ public VectorizedRowBatch readBatch(int batchSize, boolean reuse) } int rgRowCount = (int) curRowGroupStreamFooter.getNumberOfRows(); + if (rgRowCount == 0) + { + // Empty row group, mark the current row group as unreadable. + curRowGroupByteBuf.readerIndex(curRowGroupByteBuf.readerIndex() + curRowGroupByteBuf.readableBytes()); + curRGIdx++; + return resultSchema.createRowBatch(0, resultColumnsEncoded); + } + int curBatchSize; ColumnVector[] columnVectors = resultRowBatch.cols; @@ -702,6 +719,7 @@ private void acquireNewRowGroup(boolean reuse) throws IOException else // incoming byteBuf unreadable, must be end of stream { + logger.debug("In acquireNewRowGroup(), end of file"); // checkValid = false; // Issue #105: to reject continuous read. if (reuse && resultRowBatch != null) // XXX: Before we implement necessary checks, the close() below might be called before our readBatch() diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/utils/BlockingMap.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/utils/BlockingMap.java index c1f0107fc..1505e937d 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/utils/BlockingMap.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/utils/BlockingMap.java @@ -51,7 +51,7 @@ public void put(K key, V value) public V get(K key) throws InterruptedException { - V ret = getQueue(key).poll(60, TimeUnit.SECONDS); + V ret = getQueue(key).poll(300, TimeUnit.SECONDS); if (ret == null) { throw new RuntimeException("BlockingMap.get() timed out"); diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/PixelsPlanner.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/PixelsPlanner.java index e9817470e..bb71c7538 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/PixelsPlanner.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/PixelsPlanner.java @@ -970,6 +970,10 @@ else if (joinAlgo == JoinAlgorithm.PARTITIONED) rightTable, rightInputSplits, rightKeyColumnIds, rightPartitionProjection, numPartition, getIntermediateFolderForTrans(transId) + joinedTable.getSchemaName() + "/" + joinedTable.getTableName() + "/" + rightTable.getTableName() + "/"); + for (PartitionInput rightPartitionInput : rightPartitionInputs) + { + rightPartitionInput.setSmallTable(join.getJoinEndian() != JoinEndian.SMALL_LEFT); + } PartitionedTableInfo rightTableInfo = getPartitionedTableInfo( rightTable, rightKeyColumnIds, rightPartitionInputs, rightPartitionProjection); @@ -993,7 +997,7 @@ else if (joinAlgo == JoinAlgorithm.PARTITIONED) new PartitionedJoinBatchOperator(joinedTable.getTableName(), rightPartitionInputs, null, joinInputs, joinAlgo) : new PartitionedJoinStreamOperator(joinedTable.getTableName(), - null, rightPartitionInputs, joinInputs, joinAlgo); + rightPartitionInputs, null, joinInputs, joinAlgo); joinOperator.setLargeChild(childOperator); } } @@ -1005,6 +1009,11 @@ else if (joinAlgo == JoinAlgorithm.PARTITIONED) leftTable, leftInputSplits, leftKeyColumnIds, leftPartitionProjection, numPartition, getIntermediateFolderForTrans(transId) + joinedTable.getSchemaName() + "/" + joinedTable.getTableName() + "/" + leftTable.getTableName() + "/"); + for (PartitionInput leftPartitionInput : leftPartitionInputs) + { + leftPartitionInput.setSmallTable(join.getJoinEndian() == JoinEndian.SMALL_LEFT); + } + PartitionedTableInfo leftTableInfo = getPartitionedTableInfo( leftTable, leftKeyColumnIds, leftPartitionInputs, leftPartitionProjection); @@ -1013,6 +1022,11 @@ else if (joinAlgo == JoinAlgorithm.PARTITIONED) rightTable, rightInputSplits, rightKeyColumnIds, rightPartitionProjection, numPartition, getIntermediateFolderForTrans(transId) + joinedTable.getSchemaName() + "/" + joinedTable.getTableName() + "/" + rightTable.getTableName() + "/"); + for (PartitionInput rightPartitionInput : rightPartitionInputs) + { + rightPartitionInput.setSmallTable(join.getJoinEndian() != JoinEndian.SMALL_LEFT); + } + PartitionedTableInfo rightTableInfo = getPartitionedTableInfo( rightTable, rightKeyColumnIds, rightPartitionInputs, rightPartitionProjection); @@ -1264,7 +1278,9 @@ private PartitionedTableInfo getPartitionedTableInfo( if (table.getTableType() == Table.TableType.BASE) { return new PartitionedTableInfo(table.getTableName(), true, - newColumnsToRead, InputStorageInfo, rightPartitionedFiles.build(), + newColumnsToRead, + EnabledExchangeMethod == ExchangeMethod.batch ? InputStorageInfo : IntermediateStorageInfo, + rightPartitionedFiles.build(), IntraWorkerParallelism, newKeyColumnIds); } else { @@ -1310,7 +1326,9 @@ private List getPartitionInputs(Table inputTable, List getPartitionedJoinInputs( { boolean postPartition = false; PartitionInfo postPartitionInfo = null; + boolean postPartitionIsSmallTable = false; if (parent.isPresent() && parent.get().getJoin().getJoinAlgo() == JoinAlgorithm.PARTITIONED) { postPartition = true; @@ -1353,10 +1372,12 @@ private List getPartitionedJoinInputs( if (joinedTable == parent.get().getJoin().getLeftTable()) { postPartitionInfo = new PartitionInfo(parent.get().getJoin().getLeftKeyColumnIds(), numPostPartition); + postPartitionIsSmallTable = parent.get().getJoin().getJoinEndian() == JoinEndian.SMALL_LEFT; } else { postPartitionInfo = new PartitionInfo(parent.get().getJoin().getRightKeyColumnIds(), numPostPartition); + postPartitionIsSmallTable = parent.get().getJoin().getJoinEndian() != JoinEndian.SMALL_LEFT; } } @@ -1374,7 +1395,9 @@ private List getPartitionedJoinInputs( String path = getIntermediateFolderForTrans(transId) + joinedTable.getSchemaName() + "/" + joinedTable.getTableName() + "/"; - MultiOutputInfo output = new MultiOutputInfo(path, IntermediateStorageInfo, true, outputFileNames.build()); + MultiOutputInfo output = new MultiOutputInfo(path, + postPartition && EnabledExchangeMethod != ExchangeMethod.batch ? IntermediateStorageInfo : InputStorageInfo, + true, outputFileNames.build()); boolean[] leftProjection = leftPartitionProjection == null ? joinedTable.getJoin().getLeftProjection() : rewriteProjectionForPartitionedJoin(joinedTable.getJoin().getLeftProjection(), leftPartitionProjection); @@ -1386,7 +1409,7 @@ private List getPartitionedJoinInputs( { PartitionedJoinInfo joinInfo = new PartitionedJoinInfo(joinedTable.getJoin().getJoinType(), joinedTable.getJoin().getLeftColumnAlias(), joinedTable.getJoin().getRightColumnAlias(), - leftProjection, rightProjection, postPartition, postPartitionInfo, numPartition, ImmutableList.of(i)); + leftProjection, rightProjection, postPartition, postPartitionInfo, postPartitionIsSmallTable, numPartition, ImmutableList.of(i)); joinInput = new PartitionedJoinInput(transId, timestamp, leftTableInfo, rightTableInfo, joinInfo, false, null, output); } @@ -1394,7 +1417,7 @@ private List getPartitionedJoinInputs( { PartitionedJoinInfo joinInfo = new PartitionedJoinInfo(joinedTable.getJoin().getJoinType().flip(), joinedTable.getJoin().getRightColumnAlias(), joinedTable.getJoin().getLeftColumnAlias(), - rightProjection, leftProjection, postPartition, postPartitionInfo, numPartition, ImmutableList.of(i)); + rightProjection, leftProjection, postPartition, postPartitionInfo, postPartitionIsSmallTable, numPartition, ImmutableList.of(i)); joinInput = new PartitionedJoinInput(transId, timestamp, rightTableInfo, leftTableInfo, joinInfo, false, null, output); } diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinOperator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinOperator.java index a76a65773..619b065a9 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinOperator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinOperator.java @@ -192,6 +192,7 @@ public void initPlanCoordinator(PlanCoordinator planCoordinator, int parentStage partitionInput.getTableInfo().setInputSplits(ImmutableList.of(inputSplit)); tasks.add(new Task(taskId++, JSON.toJSONString(partitionInput))); } + partitionInput.getTableInfo().setInputSplits(inputSplits); } StageCoordinator partitionStageCoordinator = new StageCoordinator(smallPartitionStageId, tasks); planCoordinator.addStageCoordinator(partitionStageCoordinator, partitionStageDependency); @@ -215,6 +216,7 @@ public void initPlanCoordinator(PlanCoordinator planCoordinator, int parentStage partitionInput.getTableInfo().setInputSplits(ImmutableList.of(inputSplit)); tasks.add(new Task(taskId++, JSON.toJSONString(partitionInput))); } + partitionInput.getTableInfo().setInputSplits(inputSplits); // restore the input splits after modifying partitionInput as a temporary variable } StageCoordinator partitionStageCoordinator = new StageCoordinator(largePartitionStageId, tasks); planCoordinator.addStageCoordinator(partitionStageCoordinator, partitionStageDependency); diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinStreamOperator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinStreamOperator.java index b8d16747b..5dd2f8a23 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinStreamOperator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinStreamOperator.java @@ -27,6 +27,7 @@ import io.pixelsdb.pixels.planner.coordinate.PlanCoordinatorFactory; import io.pixelsdb.pixels.planner.plan.physical.input.JoinInput; import io.pixelsdb.pixels.planner.plan.physical.input.PartitionInput; +import io.pixelsdb.pixels.planner.plan.physical.input.PartitionedJoinInput; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,11 +59,48 @@ public CompletableFuture[]> execute() { // First, bootstrap the join workers. joinOutputs = new CompletableFuture[joinInputs.size()]; + int smallPartitionWorkerNum; + if (!smallPartitionInputs.isEmpty()) + { + smallPartitionWorkerNum = smallPartitionInputs.size(); + for (int i = 0; i < smallPartitionInputs.size(); ++i) + { + PartitionInput partitionInput = smallPartitionInputs.get(i); + partitionInput.setPartitionId(i); + } + } + else if (smallChild != null) + { + smallPartitionWorkerNum = smallChild.getJoinInputs().size(); + } + else + { + throw new IllegalStateException("smallPartitionInputs and smallChild are both null"); + } + int largePartitionWorkerNum; + if (!largePartitionInputs.isEmpty()) + { + largePartitionWorkerNum = largePartitionInputs.size(); + for (int i = 0; i < largePartitionInputs.size(); ++i) + { + PartitionInput partitionInput = largePartitionInputs.get(i); + partitionInput.setPartitionId(i); + } + } + else if (largeChild != null) + { + largePartitionWorkerNum = largeChild.getJoinInputs().size(); + } + else + { + throw new IllegalStateException("largePartitionInputs and largeChild are both null"); + } for (int i = 0; i < joinInputs.size(); ++i) { JoinInput joinInput = joinInputs.get(i); - joinInput.setSmallPartitionWorkerNum(smallPartitionInputs.size()); // XXX: could be 0 - joinInput.setLargePartitionWorkerNum(largePartitionInputs.size()); + joinInput.setSmallPartitionWorkerNum(smallPartitionWorkerNum); + joinInput.setLargePartitionWorkerNum(largePartitionWorkerNum); // XXX: Can do this in PixelsPlanner + ((PartitionedJoinInput)joinInput).getJoinInfo().setPostPartitionId(i); if (joinAlgo == JoinAlgorithm.PARTITIONED) { joinOutputs[i] = InvokerFactory.Instance() @@ -107,7 +145,7 @@ else if (smallChild != null) for (PartitionInput partitionInput : largePartitionInputs) { largePartitionOutputs[i++] = InvokerFactory.Instance() - .getInvoker(WorkerType.PARTITION_STREAMING).invoke((partitionInput)); + .getInvoker(WorkerType.PARTITION_STREAMING).invoke(partitionInput); } logger.debug("invoke large partition of " + this.getName()); @@ -155,7 +193,8 @@ else if (largeChild != null) logger.debug("invoke large partition of " + this.getName()); } - // todo: Finally, wait for the readiness of the partition operators + // todo: Finally, wait for the readiness of the partition workers + // (need to modify the partition workers to pull tasks from the worker coordinator server). return joinOutputs; }); diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/domain/JoinInfo.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/domain/JoinInfo.java index cbc6aea5a..a7cce5b7d 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/domain/JoinInfo.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/domain/JoinInfo.java @@ -59,6 +59,15 @@ public class JoinInfo * The partition information of the output if outputPartitioned is true. */ private PartitionInfo postPartitionInfo; + /** + * The partition id of this worker in post partition if outputPartitioned is true. + */ + private int postPartitionId; + /** + * Whether this table is the small table in the next-level join. This is used to determine the HTTP port + * when using streaming. + */ + private boolean postPartitionIsSmallTable; /** * Default constructor for Jackson. @@ -147,4 +156,24 @@ public void setPostPartitionInfo(PartitionInfo postPartitionInfo) { this.postPartitionInfo = postPartitionInfo; } + + public int getPostPartitionId() + { + return postPartitionId; + } + + public void setPostPartitionId(int postPartitionId) + { + this.postPartitionId = postPartitionId; + } + + public boolean getPostPartitionIsSmallTable() + { + return postPartitionIsSmallTable; + } + + public void setPostPartitionIsSmallTable(boolean postPartitionIsSmallTable) + { + this.postPartitionIsSmallTable = postPartitionIsSmallTable; + } } diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/domain/PartitionedJoinInfo.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/domain/PartitionedJoinInfo.java index d5f939ff2..eb9dad94d 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/domain/PartitionedJoinInfo.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/domain/PartitionedJoinInfo.java @@ -53,6 +53,18 @@ public PartitionedJoinInfo(JoinType joinType, String[] smallColumnAlias, String[ this.hashValues = hashValues; } + public PartitionedJoinInfo(JoinType joinType, String[] smallColumnAlias, String[] largeColumnAlias, + boolean[] smallProjection, boolean[] largeProjection, boolean postPartition, + PartitionInfo postPartitionInfo, boolean postPartitionIsSmallTable, + int numPartition, List hashValues) + { + super(joinType, smallColumnAlias, largeColumnAlias, smallProjection, largeProjection, + postPartition, postPartitionInfo); + this.numPartition = numPartition; + this.hashValues = hashValues; + this.setPostPartitionIsSmallTable(postPartitionIsSmallTable); + } + public int getNumPartition() { return numPartition; diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/input/PartitionInput.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/input/PartitionInput.java index 99999e59b..8301ee1ce 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/input/PartitionInput.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/input/PartitionInput.java @@ -51,6 +51,15 @@ public class PartitionInput extends Input * The information about the hash partitioning. */ private PartitionInfo partitionInfo; + /** + * Whether this table is the small table in the next-level join. This is used to determine the HTTP port + * when using streaming. + */ + private boolean isSmallTable; + /** + * The id of this partition in the current stage. + */ + private int partitionId; /** * Default constructor for Jackson. @@ -109,4 +118,24 @@ public void setPartitionInfo(PartitionInfo partitionInfo) { this.partitionInfo = partitionInfo; } + + public boolean isSmallTable() + { + return isSmallTable; + } + + public void setSmallTable(boolean isSmallTable) + { + this.isSmallTable = isSmallTable; + } + + public int getPartitionId() + { + return partitionId; + } + + public void setPartitionId(int partitionId) + { + this.partitionId = partitionId; + } } diff --git a/pixels-storage/pixels-storage-stream/pom.xml b/pixels-storage/pixels-storage-stream/pom.xml new file mode 100644 index 000000000..c1c6cd54f --- /dev/null +++ b/pixels-storage/pixels-storage-stream/pom.xml @@ -0,0 +1,86 @@ + + + 4.0.0 + + io.pixelsdb + pixels + 0.2.0-SNAPSHOT + ../../pom.xml + + + pixels-storage-stream + + + 8 + 8 + UTF-8 + + + + + io.pixelsdb + pixels-common + true + + + + net.java.dev.jna + jna + + + + + org.asynchttpclient + async-http-client + true + + + io.netty + netty-all + true + + + + org.apache.hadoop + hadoop-client + true + test + + + com.google.guava + guava + true + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven.plugin.deploy.version} + + + local.mvn.repo::default::file://${project.parent.basedir}/mvn + + + + + + org.apache.maven.plugins + maven-source-plugin + ${maven.plugin.source.version} + + + attach-sources + + jar + + + + + + + \ No newline at end of file diff --git a/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/PhysicalStreamReader.java b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/PhysicalStreamReader.java new file mode 100644 index 000000000..cdc32a174 --- /dev/null +++ b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/PhysicalStreamReader.java @@ -0,0 +1,138 @@ +package io.pixelsdb.pixels.storage.stream; + +import io.pixelsdb.pixels.common.physical.PhysicalReader; +import io.pixelsdb.pixels.common.physical.Storage; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +import static java.util.Objects.requireNonNull; + +public class PhysicalStreamReader implements PhysicalReader +{ + private final Storage stream; + private final String path; + private final DataInputStream dataInputStream; + + public PhysicalStreamReader(Storage storage, String path) throws IOException + { + if (storage instanceof Stream) + { + this.stream = (Stream) storage; + } + else + { + throw new IOException("Storage is not LocalFS."); + } + this.path = path; + this.dataInputStream = storage.open(path); + } + + @Override + public long getFileLength() throws IOException + { + throw new UnsupportedOperationException("Can't get file length in PhysicalStreamReader"); + } + + @Override + public void seek(long desired) throws IOException + { + throw new UnsupportedOperationException("Can't get file length in PhysicalStreamReader"); + } + + @Override + public ByteBuffer readFully(int length) throws IOException + { + byte[] buffer = new byte[length]; + dataInputStream.readFully(buffer); + return ByteBuffer.wrap(buffer); + } + + @Override + public void readFully(byte[] buffer) throws IOException + { + dataInputStream.readFully(buffer); + } + + @Override + public void readFully(byte[] buffer, int offset, int length) throws IOException + { + dataInputStream.readFully(buffer, offset, length); + } + + @Override + public long readLong(ByteOrder byteOrder) throws IOException + { + if (requireNonNull(byteOrder).equals(ByteOrder.BIG_ENDIAN)) + { + return dataInputStream.readLong(); + } + else + { + return Long.reverseBytes(dataInputStream.readLong()); + } + } + + @Override + public int readInt(ByteOrder byteOrder) throws IOException + { + if (requireNonNull(byteOrder).equals(ByteOrder.BIG_ENDIAN)) + { + return dataInputStream.readInt(); + } + else + { + return Integer.reverseBytes(dataInputStream.readInt()); + } + } + + @Override + public boolean supportsAsync() { return false; } + + @Override + public CompletableFuture readAsync(long offset, int len) throws IOException + { + throw new UnsupportedOperationException("Can't get file length in PhysicalStreamReader"); + } + + @Override + public void close() throws IOException + { + this.dataInputStream.close(); + } + + @Override + public String getPath() { return path; } + + /** + * Get the port in path. + * + * @return + */ + @Override + public String getName() + { + if (path == null) + { + return null; + } + int slash = path.lastIndexOf(":"); + return path.substring(slash + 1); + } + + @Override + public long getBlockId() throws IOException + { + throw new IOException("Can't get block id in PhysicalStreamReader"); + } + + @Override + public Storage.Scheme getStorageScheme() { return stream.getScheme(); } + + @Override + public int getNumReadRequests() { return 0; } +} diff --git a/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/PhysicalStreamWriter.java b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/PhysicalStreamWriter.java new file mode 100644 index 000000000..e15637808 --- /dev/null +++ b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/PhysicalStreamWriter.java @@ -0,0 +1,115 @@ +/* + * Copyright 2024 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.storage.stream; + +import io.pixelsdb.pixels.common.physical.PhysicalWriter; +import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.utils.Constants; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * @author huasiy + * @create 2024-11-08 + */ +public class PhysicalStreamWriter implements PhysicalWriter +{ + private Stream stream; + private String path; + private long position; + private DataOutputStream dataOutputStream; + + public PhysicalStreamWriter(Storage stream, String path) throws IOException + { + if (stream instanceof Stream) + { + this.stream = (Stream) stream; + } + else + { + throw new IOException("Storage is not stream"); + } + this.path = path; + this.dataOutputStream = stream.create(path, false, Constants.STREAM_BUFFER_SIZE); + } + + /** + * Tell the writer the offset of next write. + * + * @param length length of content + * @return starting offset after preparing. + */ + @Override + public long prepare(int length) throws IOException + { + return this.position; + } + + /** + * Append content to the file. + * + * @param buffer content buffer + * @return start offset of content in the file. + */ + @Override + public long append(ByteBuffer buffer) throws IOException + { + buffer.flip(); + int length = buffer.remaining(); + return append(buffer.array(), buffer.arrayOffset() + buffer.position(), length); + } + + /** + * Append content to the file. + * + * @param buffer content buffer container + * @param offset start offset of actual content buffer + * @param length length of actual content buffer + * @return start offset of content in the file. + */ + @Override + public long append(byte[] buffer, int offset, int length) throws IOException + { + long start = this.position; + dataOutputStream.write(buffer, offset, length); + position += length; + return start; + } + + @Override + public void close() throws IOException + { + dataOutputStream.close(); + } + + @Override + public void flush() throws IOException + { + dataOutputStream.flush(); + } + + @Override + public String getPath() { return path; } + + @Override + public int getBufferSize() { return Constants.STREAM_BUFFER_SIZE; } +} diff --git a/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/Stream.java b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/Stream.java new file mode 100644 index 000000000..dc2a6b543 --- /dev/null +++ b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/Stream.java @@ -0,0 +1,163 @@ +/* + * Copyright 2024 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.storage.stream; + +import io.pixelsdb.pixels.common.physical.Status; +import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.physical.StreamPath; +import io.pixelsdb.pixels.storage.stream.io.StreamInputStream; +import io.pixelsdb.pixels.storage.stream.io.StreamOutputStream; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + + +public final class Stream implements Storage +{ + private static final String SchemePrefix = Scheme.httpstream.name() + "://"; + + public Stream() { } + + @Override + public Scheme getScheme() { return Scheme.httpstream; } + + @Override + public String ensureSchemePrefix(String path) throws IOException + { + if (path.startsWith(SchemePrefix)) + { + return path; + } + if (path.contains("://")) + { + throw new IOException("Path '" + path + + "' already has a different scheme prefix than '" + SchemePrefix + "'."); + } + return SchemePrefix + path; + } + + /** + * This method is used for read content from stream. + * @param path + * @return + */ + @Override + public DataInputStream open(String path) throws IOException + { + StreamPath streamPath = new StreamPath(path); + if (!streamPath.valid) + { + throw new IOException("Path '" + path + "' is not valid."); + } + + StreamInputStream inputStream; + try + { + inputStream = new StreamInputStream(streamPath.getHostName(), streamPath.getPort()); + } catch (Exception e) + { + throw new IOException("Failed to open streamInputStream, " + e.toString()); + } + return new DataInputStream(inputStream); + } + + @Override + public List listStatus(String... path) + { + throw new NotImplementedException(); + } + + @Override + public List listPaths(String... path) + { + throw new NotImplementedException(); + } + + @Override + public Status getStatus(String path) + { + throw new NotImplementedException(); + } + + @Override + public long getFileId(String path) + { + throw new NotImplementedException(); + } + + @Override + public boolean mkdirs(String path) + { + throw new NotImplementedException(); + } + + /** + * This method is used for write content to stream. + */ + @Override + public DataOutputStream create(String path, boolean overwrite, int bufferSize) throws IOException + { + StreamPath streamPath = new StreamPath(path); + if (!streamPath.valid) + { + throw new IOException("Path '" + path + "' is not valid."); + } + return new DataOutputStream(new StreamOutputStream(streamPath.getHostName(), streamPath.getPort(), bufferSize)); + } + + @Override + public boolean delete(String path, boolean recursive) + { + throw new NotImplementedException(); + } + + @Override + public boolean supportDirectCopy() { return false; } + + @Override + public boolean directCopy(String src, String dest) + { + throw new NotImplementedException(); + } + + @Override + public void close() throws IOException { } + + @Override + public boolean exists(String path) + { + throw new NotImplementedException(); + } + + @Override + public boolean isFile(String path) + { + return false; + } + + @Override + public boolean isDirectory(String path) + { + return false; + } +} \ No newline at end of file diff --git a/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/StreamProvider.java b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/StreamProvider.java new file mode 100644 index 000000000..4f0e92054 --- /dev/null +++ b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/StreamProvider.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.storage.stream; + +import io.pixelsdb.pixels.common.physical.*; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; + +/** + * @author huasiy + * @create 2024-11-05 + */ +public class StreamProvider implements StorageProvider +{ + @Override + public Storage createStorage(@Nonnull Storage.Scheme scheme) throws IOException + { + if (!this.compatibleWith(scheme)) + { + throw new IOException("incompatible storage scheme: " + scheme); + } + return new Stream(); + } + + @Override + public PhysicalReader createReader(@Nonnull Storage storage, @Nonnull String path, @Nullable PhysicalReaderOption option) throws IOException + { + if (!this.compatibleWith(storage.getScheme())) + { + throw new IOException("incompatible storage scheme: " + storage.getScheme()); + } + return new PhysicalStreamReader(storage, path); + } + + @Override + public PhysicalWriter createWriter(@Nonnull Storage storage, @Nonnull String path, @Nonnull PhysicalWriterOption option) throws IOException + { + if (!this.compatibleWith(storage.getScheme())) + { + throw new IOException("incompatible storage scheme: " + storage.getScheme()); + } + return new PhysicalStreamWriter(storage, path); + } + + @Override + public boolean compatibleWith(@Nonnull Storage.Scheme scheme) { return scheme.equals(Storage.Scheme.httpstream); } +} diff --git a/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/io/StreamInputStream.java b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/io/StreamInputStream.java new file mode 100644 index 000000000..f2d8de611 --- /dev/null +++ b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/io/StreamInputStream.java @@ -0,0 +1,257 @@ +package io.pixelsdb.pixels.storage.stream.io; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; +import io.pixelsdb.pixels.common.utils.HttpServer; +import io.pixelsdb.pixels.common.utils.HttpServerHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.io.InputStream; +import java.security.cert.CertificateException; +import java.util.concurrent.*; + +public class StreamInputStream extends InputStream +{ + private static final Logger logger = LogManager.getLogger(StreamInputStream.class); + + /** + * indicates whether the stream is still open / valid + */ + private boolean open; + + /** + * The schema of http stream. + * Default value is http. + */ + private final String schema = "http"; + + /** + * The host of http stream. + */ + private String host; + + /** + * The port of http stream. + */ + private int port; + + /** + * The uri of http stream. + */ + private String uri; + + /** + * The temporary buffer used for storing the chunks. + */ + private final BlockingQueue contentQueue; + + /** + * The capacity of buffer. + */ + private final int bufferCapacity = 1000000000; + + /** + * The maximum tries to get data. + */ + private final int MAX_TRIES = 10; + + /** + * The milliseconds to sleep. + */ + private final int DELAY_MS = 2000; + + /** + * The http server for receiving input stream. + */ + private final HttpServer httpServer; + + /** + * The thread to run http server. + */ + private final ExecutorService executorService; + + /** + * The future of http server. + */ + private final CompletableFuture httpServerFuture; + + public StreamInputStream(String host, int port) throws CertificateException, SSLException { + this.open = true; + this.contentQueue = new LinkedBlockingDeque<>(); + this.host = host; + this.port = port; + this.uri = this.schema + "://" + host + ":" + port; + this.httpServer = new HttpServer(new StreamHttpServerHandler(this)); + this.executorService = Executors.newFixedThreadPool(1); + this.httpServerFuture = CompletableFuture.runAsync(() -> { + try + { + this.httpServer.serve(this.port); + } catch (InterruptedException e) + { + logger.error("http server interrupted", e); + } + }, this.executorService); + } + + @Override + public int read() throws IOException + { + assertOpen(); + if (!assertData()) + { + return -1; + } + + ByteBuf content = this.contentQueue.peek(); + int b = -1; + if (content != null) + { + b = content.readUnsignedByte(); + if (!content.isReadable()) + { + this.contentQueue.poll(); + } + } + return b; + } + + @Override + public int read(byte[] b) throws IOException + { + return read(b, 0, b.length); + } + + /** + * Attempt to read data with a maximum length of len into the position off of buf. + * @param buf + * @param off + * @param len + * @return Actual number of bytes read + * @throws IOException + */ + @Override + public int read(byte[] buf, int off, int len) throws IOException + { + assertOpen(); + if (!assertData()) + { + return -1; + } + + int readBytes = 0; + while (readBytes < len && !this.contentQueue.isEmpty()) + { + ByteBuf content = this.contentQueue.peek(); + int readLen = Math.min(len-readBytes, content.readableBytes()); + content.readBytes(buf, readBytes, readLen); + if (!content.isReadable()) + { + content.release(); + this.contentQueue.poll(); + } + readBytes += readLen; + } + + return readBytes; + } + + @Override + public void close() throws IOException + { + if (this.open) + { + this.open = false; + this.httpServerFuture.complete(null); + this.httpServer.close(); + } + } + + private boolean assertData() throws IOException + { + int tries = 0; + while (tries < this.MAX_TRIES && this.contentQueue.isEmpty() && !this.httpServerFuture.isDone()) + { + try + { + tries++; + Thread.sleep(this.DELAY_MS); + } catch (InterruptedException e) + { + throw new IOException(e); + } + } + + return !this.contentQueue.isEmpty(); + } + + private void assertOpen() + { + if (!this.open) + { + throw new IllegalStateException("Closed"); + } + } + + public static class StreamHttpServerHandler extends HttpServerHandler + { + private static final Logger logger = LogManager.getLogger(StreamHttpServerHandler.class); + private StreamInputStream inputStream; + + public StreamHttpServerHandler(StreamInputStream inputStream) + { + this.inputStream = inputStream; + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) + { + if (!(msg instanceof HttpRequest)) + { + return; + } + FullHttpRequest req = (FullHttpRequest) msg; + if (req.method() != HttpMethod.POST) + { + req.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + sendResponse(ctx, req, HttpResponseStatus.BAD_REQUEST); + } + + if (!req.headers().get(HttpHeaderNames.CONTENT_TYPE).equals("application/x-protobuf")) + { + return; + } + ByteBuf content = req.content(); + if (content.isReadable()) + { + content.retain(); + this.inputStream.contentQueue.add(content); + } + sendResponse(ctx, req, HttpResponseStatus.OK); + } + + private void sendResponse(ChannelHandlerContext ctx, FullHttpRequest req, HttpResponseStatus status) + { + FullHttpResponse response = new DefaultFullHttpResponse(req.protocolVersion(), status); + response.headers() + .set(HttpHeaderNames.CONTENT_TYPE, "text/plain") + .set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); + + if (req.headers().get(HttpHeaderNames.CONNECTION).equals(HttpHeaderValues.CLOSE.toString())) + { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + response.setStatus(status); + ctx.writeAndFlush(response); + this.serverCloser.run(); + } else + { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ctx.writeAndFlush(response); + } + } + } +} diff --git a/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/io/StreamOutputStream.java b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/io/StreamOutputStream.java new file mode 100644 index 000000000..68637a3e5 --- /dev/null +++ b/pixels-storage/pixels-storage-stream/src/main/java/io/pixelsdb/pixels/storage/stream/io/StreamOutputStream.java @@ -0,0 +1,249 @@ +package io.pixelsdb.pixels.storage.stream.io; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.asynchttpclient.*; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class StreamOutputStream extends OutputStream +{ + private static final Logger logger = LogManager.getLogger(StreamInputStream.class); + + /** + * indicates whether the stream is still open / valid + */ + private boolean open; + + /** + * The schema of http stream. + * Default value is http. + */ + private final String schema = "http"; + + /** + * The host of http stream. + */ + private String host; + + /** + * The port of http stream. + */ + private int port; + + /** + * The uri of http stream. + */ + private String uri; + + /** + * The maximum retry count. + */ + private static final int MAX_RETRIES = 10; + + /** + * The delay between two tries. + */ + private static final long RETRY_DELAY_MS = 1000; + + /** + * The temporary buffer used for storing the chunks. + */ + private final byte[] buffer; + + /** + * The position in the buffer. + */ + private int bufferPosition; + + /** + * The capacity of buffer. + */ + private int bufferCapacity; + + /** + * The http client. + */ + private final AsyncHttpClient httpClient; + + public StreamOutputStream(String host, int port, int bufferCapacity) + { + this.open = true; + this.host = host; + this.port = port; + this.uri = this.schema + "://" + host + ":" + port; + this.bufferCapacity = bufferCapacity; + this.buffer = new byte[bufferCapacity]; + this.bufferPosition = 0; + this.httpClient = Dsl.asyncHttpClient(); + } + + /** + * Write an array to the S3 output stream + * + * @param b + * @throws IOException + */ + @Override + public void write(byte[] b) throws IOException + { + write(b, 0, b.length); + } + + @Override + public void write(final byte[] buf, final int off, final int len) throws IOException + { + this.assertOpen(); + int offsetInBuf = off, remainToRead = len; + int remainInBuffer; + while (remainToRead > (remainInBuffer = this.buffer.length - bufferPosition)) + { + System.arraycopy(buf, offsetInBuf, this.buffer, this.bufferPosition, remainInBuffer); + this.bufferPosition += remainInBuffer; + flushBufferAndRewind(); + offsetInBuf += remainInBuffer; + remainToRead -= remainInBuffer; + } + System.arraycopy(buf, offsetInBuf, this.buffer, this.bufferPosition, remainToRead); + this.bufferPosition += remainToRead; + } + + @Override + public void write(int b) throws IOException + { + this.assertOpen(); + if (this.bufferPosition >= this.buffer.length) + { + flushBufferAndRewind(); + } + this.buffer[this.bufferPosition++] = (byte) b; + } + + @Override + public synchronized void flush() + { + assertOpen(); + try + { + flushBufferAndRewind(); + } catch (IOException e) + { + logger.error(e); + } + } + + protected void flushBufferAndRewind() throws IOException + { + logger.debug("Sending {} bytes to stream", this.bufferPosition); + Request req = httpClient.preparePost(this.uri) + .setBody(ByteBuffer.wrap(this.buffer, 0, this.bufferPosition)) + .addHeader(HttpHeaderNames.CONTENT_TYPE, "application/x-protobuf") + .addHeader(HttpHeaderNames.CONTENT_LENGTH, this.bufferPosition) + .addHeader(HttpHeaderNames.CONNECTION, "keep-aliva") + .build(); + int retry = 0; + while (true) + { + StreamHttpClientHandler handler = new StreamHttpClientHandler(); + try + { + httpClient.executeRequest(req, handler).get(); + this.bufferPosition = 0; + break; + } catch (Exception e) + { + retry++; + if (retry > MAX_RETRIES || !(e.getCause() instanceof java.net.ConnectException)) + { + logger.error("retry count {}, exception cause {}, excepetion {}", retry, e.getCause(), e.getMessage()); + throw new IOException("Connect to stream failed"); + } else + { + try + { + Thread.sleep(RETRY_DELAY_MS); + } catch (InterruptedException e1) + { + throw new IOException(e1); + } + } + } + } + this.bufferPosition = 0; + } + + @Override + public void close() throws IOException + { + if (this.open) + { + this.open = false; + if (this.bufferPosition > 0) + { + flushBufferAndRewind(); + } + closeStreamReader(); + this.httpClient.close(); + } + } + + /** + * Tell stream reader that this stream closes. + */ + private void closeStreamReader() + { + Request req = httpClient.preparePost(this.uri) + .addHeader(HttpHeaderNames.CONTENT_TYPE, "application/x-protobuf") + .addHeader(HttpHeaderNames.CONTENT_LENGTH, 0) + .addHeader(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) + .build(); + int retry = 0; + while (true) + { + StreamHttpClientHandler handler = new StreamHttpClientHandler(); + try + { + httpClient.executeRequest(req, handler).get(); + break; + } catch (Exception e) + { + retry++; + if (retry > this.MAX_RETRIES || !(e.getCause() instanceof java.net.ConnectException)) + { + logger.error("failed to close stream reader"); + } + } + } + } + + private void assertOpen() + { + if (!this.open) + { + throw new IllegalStateException("Closed"); + } + } + + public static class StreamHttpClientHandler extends AsyncCompletionHandler + { + @Override + public Response onCompleted(Response response) throws Exception + { + if (response.getStatusCode() != 200) + { + throw new IOException("Failed to send package to server, status code: " + response.getStatusCode()); + } + return response; + } + + @Override + public void onThrowable(Throwable t) + { + logger.error("stream http client handler, {}", t.getMessage()); + } + } +} diff --git a/pixels-storage/pixels-storage-stream/src/test/java/io/pixelsdb/pixels/storage/stream/TestStream.java b/pixels-storage/pixels-storage-stream/src/test/java/io/pixelsdb/pixels/storage/stream/TestStream.java new file mode 100644 index 000000000..1ba2cf245 --- /dev/null +++ b/pixels-storage/pixels-storage-stream/src/test/java/io/pixelsdb/pixels/storage/stream/TestStream.java @@ -0,0 +1,138 @@ +/* + * Copyright 2024 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.storage.stream; + +import io.pixelsdb.pixels.common.physical.*; +import org.apache.hadoop.io.IOUtils; +import org.junit.Test; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static com.google.common.base.Preconditions.checkArgument; + +public class TestStream +{ + private volatile Exception readerException = null; + private volatile Exception writerException = null; + + @Test + public void testStorage() throws IOException + { + Storage stream = StorageFactory.Instance().getStorage(Storage.Scheme.httpstream); + InputStream fileInput = Files.newInputStream(Paths.get("/tmp/test1")); + OutputStream outputStream = stream.create("stream:///localhost:29920", false, 4096); + InputStream inputStream = stream.open("stream:///localhost:29920"); + OutputStream fileOutput = Files.newOutputStream(Paths.get("/tmp/test2")); + IOUtils.copyBytes(fileInput, outputStream, 4096, true); + IOUtils.copyBytes(inputStream, fileOutput, 4096, true); + } + + /** + * Occasionally, the physicalReader fails to read the desired length of the string, causing the test to fail, + * with a probability of less than 1/20. + * @throws IOException + */ + @Test + public void testPhysicalReaderAndWriter() throws IOException + { + Storage stream = StorageFactory.Instance().getStorage(Storage.Scheme.httpstream); + Thread readerThread = new Thread(() -> { + try + { + try (PhysicalReader fsReader = PhysicalReaderUtil.newPhysicalReader(stream, "stream:///localhost:29920")) + { + int num1 = fsReader.readInt(ByteOrder.BIG_ENDIAN); + assert(num1 == 13); + num1 = fsReader.readInt(ByteOrder.BIG_ENDIAN); + assert(num1 == 169); + + long num2 = fsReader.readLong(ByteOrder.BIG_ENDIAN); + assert(num2 == 28561); + num2 = fsReader.readLong(ByteOrder.BIG_ENDIAN); + assert(num2 == 815730721); + + ByteBuffer buffer; + for (int len = 1; len < 1000000000; len=len*2) + { + buffer = fsReader.readFully(len); + for (int i = 0; i < len; i++) + { + byte tmp = buffer.get(); + if (tmp != 'a') + { + System.out.println(len); + throw new IOException("failed: " + len); + } + } + } + } + } catch (IOException e) + { + readerException = e; + throw new RuntimeException(e); + } + }); + Thread writerThread = new Thread(() -> { + try + { + try (PhysicalWriter fsWriter = PhysicalWriterUtil.newPhysicalWriter(stream, "stream:///localhost:29920", null)) + { + ByteBuffer buffer = ByteBuffer.allocate(24); + buffer.putInt(13); + buffer.putInt(169); + buffer.putLong(28561); + buffer.putLong(815730721); + fsWriter.append(buffer); + fsWriter.flush(); + for (int len = 1; len < 1000000000; len=len*2) + { + buffer = ByteBuffer.allocate(len); + for (int i = 0; i < len; i++) + { + buffer.put((byte) 'a'); + } + fsWriter.append(buffer); + } + } + } catch (IOException e) + { + writerException = e; + throw new RuntimeException(e); + } + }); + readerThread.start(); + writerThread.start(); + try + { + readerThread.join(); + writerThread.join(); + if (this.readerException != null || this.writerException != null) + { + throw new IOException(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/VhiveInvoker.java b/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/VhiveInvoker.java index cc2428be1..246dc16ff 100644 --- a/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/VhiveInvoker.java +++ b/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/VhiveInvoker.java @@ -33,21 +33,23 @@ public abstract class VhiveInvoker implements Invoker { private static final Logger logger = LogManager.getLogger(VhiveInvoker.class); private final String functionName; - private final int memoryMB; + private int memoryMB; protected VhiveInvoker(String functionName) { this.functionName = functionName; - int memoryMB = 0; - try - { - TurboProto.GetMemoryResponse response = Vhive.Instance().getAsyncClient().getMemory().get(); - memoryMB = (int) response.getMemoryMB(); - } catch (Exception e) - { - logger.error("failed to get memory: " + e); - } - this.memoryMB = memoryMB; + new Thread(() -> { + int memoryMB = 0; + try + { + TurboProto.GetMemoryResponse response = Vhive.Instance().getAsyncClient().getMemory().get(); + memoryMB = (int) response.getMemoryMB(); + } catch (Exception e) + { + logger.error("failed to get memory: " + e); + } + this.memoryMB = memoryMB; + }).start(); } @Override diff --git a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BasePartitionStreamWorker.java b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BasePartitionStreamWorker.java index 35a5e4b23..c5990bab2 100644 --- a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BasePartitionStreamWorker.java +++ b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BasePartitionStreamWorker.java @@ -69,10 +69,12 @@ public BasePartitionStreamWorker(WorkerContext context) super(context); this.logger = context.getLogger(); this.workerMetrics = context.getWorkerMetrics(); - this.workerCoordinateService = new WorkerCoordinateService("128.110.218.225", 18894); - // Hardcoded for Cloudlab. todo: Need to figure out how to get the daemon IP dynamically. - // Perhaps add a field in the WorkerContext class to store the daemon IP, - // or to have the Pixels planner pass the daemon IP in the Input. + this.workerCoordinateService = new WorkerCoordinateService( + StreamWorkerCommon.getCoordinatorIp(), StreamWorkerCommon.getCoordinatorPort()); + // In cloud functions, configuration files "pixels.properties" are not present, and so the pre-packaged + // configuration file "pixels-common/src/main/resources/pixels.properties" will be used during runtime. + // Therefore, you need to modify the coordinator host and port in the pre-packaged configuration file on localhost + // where you rebuild the Docker image. } @Override @@ -184,19 +186,15 @@ public PartitionOutput process(PartitionInput event) .collect(ImmutableList.toImmutableList()); List outputEndpoints = downStreamWorkers.stream() .map(CFWorkerInfo::getIp) - .map(ip -> "http://" + ip + ":" - + (Objects.equals(event.getTableInfo().getTableName(), "part") ? "18688" : "18686") + "/") + .map(ip -> "http://" + ip + ":" + + (event.isSmallTable() ? StreamWorkerCommon.STREAM_PORT_SMALL_TABLE : StreamWorkerCommon.STREAM_PORT_LARGE_TABLE)) // .map(URI::create) .collect(Collectors.toList()); - // todo: Need to pass whether the table is the large table or the small table here into the partition worker. - // Perhaps add a boolean field in the PartitionInput class. - // Currently, we hardcode the table name for TPC-H Q14 - the large table (rightTable for join) uses port 18686 - // while the small table (leftTable for join) uses port 18688. StreamWorkerCommon.passSchemaToNextLevel(writerSchema.get(), outputStorageInfo, outputEndpoints); PixelsWriter pixelsWriter = StreamWorkerCommon.getWriter(writerSchema.get(), StreamWorkerCommon.getStorage(outputStorageInfo.getScheme()), outputPath, encoding, - true, 0, // todo: hardcoded for only 1 partition worker scenario; need to pass the actual value + true, event.getPartitionId(), Arrays.stream(keyColumnIds).boxed().collect(Collectors.toList()), outputEndpoints, false); Set hashValues = new HashSet<>(numPartition); @@ -209,8 +207,12 @@ public PartitionOutput process(PartitionInput event) { pixelsWriter.addRowBatch(batch, hash); } - hashValues.add(hash); } + else + { + pixelsWriter.addRowBatch(null, hash); + } + hashValues.add(hash); } partitionOutput.addOutput(outputPath); partitionOutput.setHashValues(hashValues); diff --git a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BasePartitionedJoinStreamWorker.java b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BasePartitionedJoinStreamWorker.java index 6036f61ca..0ece6d760 100644 --- a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BasePartitionedJoinStreamWorker.java +++ b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BasePartitionedJoinStreamWorker.java @@ -19,6 +19,7 @@ */ package io.pixelsdb.pixels.worker.common; +import com.google.common.collect.ImmutableList; import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.core.PixelsReader; import io.pixelsdb.pixels.core.PixelsWriter; @@ -66,7 +67,8 @@ public BasePartitionedJoinStreamWorker(WorkerContext context) // this.logger = context.getLogger(); this.workerMetrics = context.getWorkerMetrics(); this.workerMetrics.clear(); - this.workerCoordinateService = new WorkerCoordinateService("128.110.218.225", 18894); + this.workerCoordinateService = new WorkerCoordinateService( + StreamWorkerCommon.getCoordinatorIp(), StreamWorkerCommon.getCoordinatorPort()); } @Override @@ -94,7 +96,11 @@ public JoinOutput process(PartitionedJoinInput event) List leftPartitioned = event.getSmallTable().getInputFiles(); requireNonNull(leftPartitioned, "leftPartitioned is null"); checkArgument(leftPartitioned.size() > 0, "leftPartitioned is empty"); - int leftParallelism = event.getSmallTable().getParallelism(); + int leftParallelism = 1; // event.getSmallTable().getParallelism(); + // todo: Intra-worker parallelism support in streaming mode + // Currently, we only support an intra-worker parallelism of 1 (no parallelism) in streaming mode. + // Need to allow each join worker to use multiple ports to read input in parallel, so as to + // build the hash table in parallel, thus achieving intra-worker parallelism. checkArgument(leftParallelism > 0, "leftParallelism is not positive"); String[] leftColumnsToRead = event.getSmallTable().getColumnsToRead(); int[] leftKeyColumnIds = event.getSmallTable().getKeyColumnIds(); @@ -104,7 +110,7 @@ public JoinOutput process(PartitionedJoinInput event) List rightPartitioned = event.getLargeTable().getInputFiles(); requireNonNull(rightPartitioned, "rightPartitioned is null"); checkArgument(rightPartitioned.size() > 0, "rightPartitioned is empty"); - int rightParallelism = event.getLargeTable().getParallelism(); + int rightParallelism = 1; // event.getLargeTable().getParallelism(); checkArgument(rightParallelism > 0, "rightParallelism is not positive"); String[] rightColumnsToRead = event.getLargeTable().getColumnsToRead(); int[] rightKeyColumnIds = event.getLargeTable().getKeyColumnIds(); @@ -159,7 +165,14 @@ public JoinOutput process(PartitionedJoinInput event) AtomicReference leftSchema = new AtomicReference<>(); AtomicReference rightSchema = new AtomicReference<>(); - // `registerWorker()` might awake the dependent workers, so it should be called just before + // Bootstrap the readers at once which is up all the time during the worker's lifetime, + // to ensure immediate reception of intermediate data and avoid retries on the writer side. + PixelsReader leftPixelsReader = StreamWorkerCommon.getReader( leftInputStorageInfo.getScheme(), + "http://localhost:" + StreamWorkerCommon.STREAM_PORT_SMALL_TABLE, true, event.getSmallPartitionWorkerNum()); + PixelsReader rightPixelsReader = StreamWorkerCommon.getReader(rightInputStorageInfo.getScheme(), + "http://localhost:" + StreamWorkerCommon.STREAM_PORT_LARGE_TABLE, true, event.getLargePartitionWorkerNum()); + + // `registerWorker()` might awake the dependent workers, so it should be called just before / after // the current worker listens on its HTTP port and is ready to receive streaming packets. CFWorkerInfo workerInfo = new CFWorkerInfo( InetAddress.getLocalHost().getHostAddress(), -1, @@ -172,14 +185,9 @@ public JoinOutput process(PartitionedJoinInput event) logger.debug("getSchemaFromPaths, left input: " + leftPartitioned + ", right input: " + rightPartitioned); - StreamWorkerCommon.getSchemaFromPaths(threadPool, - StreamWorkerCommon.getStorage(leftInputStorageInfo.getScheme()), - StreamWorkerCommon.getStorage(rightInputStorageInfo.getScheme()), - leftSchema, rightSchema, - Collections.singletonList("http://localhost:18688/"), - Collections.singletonList("http://localhost:18686/")); - // XXX: Better to ensure the subsequent data reader is up immediately after the schema is ready, - // to avoid retries on the writer side. + // XXX: StreamWorkerCommon.getSchemaFromPaths() can be removed + leftSchema.set ( leftPixelsReader.getFileSchema()); + rightSchema.set(rightPixelsReader.getFileSchema()); /* * Issue #450: * For the left and the right partial partitioned files, the file schema is equal to the columns to read in normal cases. @@ -190,6 +198,22 @@ public JoinOutput process(PartitionedJoinInput event) leftColAlias, leftProjection, leftKeyColumnIds, StreamWorkerCommon.getResultSchema(rightSchema.get(), rightColumnsToRead), rightColAlias, rightProjection, rightKeyColumnIds); + List downStreamWorkers = workerCoordinateService.getDownstreamWorkers(worker.getWorkerId()) + .stream() + .sorted(Comparator.comparing(worker -> worker.getHashValues().get(0))) + .collect(ImmutableList.toImmutableList()); + List outputEndpoints = downStreamWorkers.stream() + .map(CFWorkerInfo::getIp) + .map(ip -> "http://" + ip + ":" + + (event.getJoinInfo().getPostPartitionIsSmallTable() ? + StreamWorkerCommon.STREAM_PORT_SMALL_TABLE : StreamWorkerCommon.STREAM_PORT_LARGE_TABLE)) + // .map(URI::create) + .collect(Collectors.toList()); + if (partitionOutput) + { + StreamWorkerCommon.passSchemaToNextLevel(joiner.getJoinedSchema(), outputStorageInfo, outputEndpoints); + } + // build the hash table for the left table. List leftFutures = new ArrayList<>(leftPartitioned.size()); int leftSplitSize = leftPartitioned.size() / leftParallelism; @@ -200,8 +224,6 @@ public JoinOutput process(PartitionedJoinInput event) for (int i = 0; i < leftPartitioned.size(); i += leftSplitSize) { List parts = new LinkedList<>(); - // XXX: Can allow 1 join worker to use multiple ports to read input in parallel, so as to - // build the hash table in parallel. for (int j = i; j < i + leftSplitSize && j < leftPartitioned.size(); ++j) { parts.add(leftPartitioned.get(j)); @@ -210,7 +232,7 @@ public JoinOutput process(PartitionedJoinInput event) try { buildHashTable(transId, joiner, parts, leftColumnsToRead, leftInputStorageInfo.getScheme(), - hashValues, event.getSmallPartitionWorkerNum(), workerMetrics); + hashValues, event.getSmallPartitionWorkerNum(), workerMetrics, leftPixelsReader); } catch (Throwable e) { @@ -239,8 +261,10 @@ public JoinOutput process(PartitionedJoinInput event) } // scan the right table and do the join. - if (joiner.getSmallTableSize() > 0) - { + // We no longer check this condition in streaming mode, because even if the joiner is empty, + // we have to read from the right table to enforce the streaming protocol. +// if (joiner.getSmallTableSize() > 0) +// { int rightSplitSize = rightPartitioned.size() / rightParallelism; if (rightPartitioned.size() % rightParallelism > 0) { @@ -261,10 +285,10 @@ public JoinOutput process(PartitionedJoinInput event) joinWithRightTableAndPartition( transId, joiner, parts, rightColumnsToRead, rightInputStorageInfo.getScheme(), hashValues, - event.getLargePartitionWorkerNum(), outputPartitionInfo, result, workerMetrics) : + event.getLargePartitionWorkerNum(), outputPartitionInfo, result, workerMetrics, rightPixelsReader) : joinWithRightTable(transId, joiner, parts, rightColumnsToRead, rightInputStorageInfo.getScheme(), hashValues, - event.getLargePartitionWorkerNum(), result.get(0), workerMetrics); + event.getLargePartitionWorkerNum(), result.get(0), workerMetrics, rightPixelsReader); } catch (Throwable e) { @@ -286,20 +310,25 @@ public JoinOutput process(PartitionedJoinInput event) { throw new WorkerException("error occurred threads, please check the stacktrace before this log record"); } - } +// } String outputPath = outputFolder + outputInfo.getFileNames().get(0); try { WorkerMetrics.Timer writeCostTimer = new WorkerMetrics.Timer().start(); PixelsWriter pixelsWriter; + // XXX: The post partition code below is adapted to the streaming protocol. + // Consider modifying the reader and writer code instead (good practice of layering) if (partitionOutput) { + // In partitioned mode, the schema is sent in an over-replicated manner: + // every previous-stage worker (rather than one of them) sends a schema packet + // before sending its intermediate data, to prevent errors from possibly out-of-order packet arrivals. pixelsWriter = StreamWorkerCommon.getWriter(joiner.getJoinedSchema(), StreamWorkerCommon.getStorage(outputStorageInfo.getScheme()), outputPath, - encoding, true, -1, Arrays.stream( + encoding, true, event.getJoinInfo().getPostPartitionId(), Arrays.stream( outputPartitionInfo.getKeyColumnIds()).boxed(). - collect(Collectors.toList())); + collect(Collectors.toList()), outputEndpoints, false); for (int hash = 0; hash < outputPartitionInfo.getNumPartition(); ++hash) { ConcurrentLinkedQueue batches = result.get(hash); @@ -310,6 +339,10 @@ public JoinOutput process(PartitionedJoinInput event) pixelsWriter.addRowBatch(batch, hash); } } + else + { + pixelsWriter.addRowBatch(null, hash); + } } } else @@ -345,9 +378,10 @@ public JoinOutput process(PartitionedJoinInput event) requireNonNull(outputPartitionInfo, "outputPartitionInfo is null"); pixelsWriter = StreamWorkerCommon.getWriter(joiner.getJoinedSchema(), StreamWorkerCommon.getStorage(outputStorageInfo.getScheme()), outputPath, - encoding, true, -1, Arrays.stream( + encoding, true, event.getJoinInfo().getPostPartitionId(), Arrays.stream( outputPartitionInfo.getKeyColumnIds()).boxed(). - collect(Collectors.toList())); + collect(Collectors.toList())); // , outputEndpoints, false); + // TODO: Adapt the left-outer tail to streaming mode. joiner.writeLeftOuterAndPartition(pixelsWriter, StreamWorkerCommon.rowBatchSize, outputPartitionInfo.getNumPartition(), outputPartitionInfo.getKeyColumnIds()); } @@ -408,7 +442,7 @@ public JoinOutput process(PartitionedJoinInput event) */ protected static void buildHashTable(long transId, Joiner joiner, List leftParts, String[] leftCols, Storage.Scheme leftScheme, List hashValues, int numPartition, - WorkerMetrics workerMetrics) throws IOException + WorkerMetrics workerMetrics, PixelsReader leftPixelsReader) throws IOException { // In streaming mode, numPartition is the total number of partition workers, i.e. the number of incoming packets. logger.debug("building hash table for the left table, partition paths: " + leftParts); @@ -416,73 +450,52 @@ protected static void buildHashTable(long transId, Joiner joiner, List l WorkerMetrics.Timer computeCostTimer = new WorkerMetrics.Timer(); long readBytes = 0L; int numReadRequests = 0; - while (!leftParts.isEmpty()) + + readCostTimer.start(); + PixelsReader pixelsReader = leftPixelsReader; + try { - for (Iterator it = leftParts.iterator(); it.hasNext(); ) + readCostTimer.stop(); + checkArgument(pixelsReader.isPartitioned(), "pixels file is not partitioned"); + for (int hashValue : hashValues) { - String leftPartitioned = it.next(); - readCostTimer.start(); - PixelsReader pixelsReader = null; - try - { - pixelsReader = StreamWorkerCommon.getReader(leftScheme, "http://localhost:18688/", true, numPartition); - readCostTimer.stop(); - checkArgument(pixelsReader.isPartitioned(), "pixels file is not partitioned"); - for (int hashValue : hashValues) - { - PixelsReaderOption option = StreamWorkerCommon.getReaderOption(transId, leftCols, pixelsReader, - hashValue, numPartition); - VectorizedRowBatch rowBatch; - PixelsRecordReader recordReader = pixelsReader.read(option); - // XXX: perhaps do not need to re-initialize the record reader for each hash value. - if (recordReader == null) continue; - checkArgument(recordReader.isValid(), "failed to get record reader"); - - computeCostTimer.start(); - do - { - rowBatch = recordReader.readBatch(StreamWorkerCommon.rowBatchSize); - if (rowBatch.size > 0) - { - joiner.populateLeftTable(rowBatch); - } - } while (!rowBatch.endOfFile); - computeCostTimer.stop(); - computeCostTimer.minus(recordReader.getReadTimeNanos()); - readCostTimer.add(recordReader.getReadTimeNanos()); - readBytes += recordReader.getCompletedBytes(); - numReadRequests += recordReader.getNumReadRequests(); - } - it.remove(); - } - catch (Throwable e) - { - if (e instanceof IOException) - { - continue; - } - throw new WorkerException("failed to scan the partitioned file '" + - leftPartitioned + "' and build the hash table", e); - } - finally + PixelsReaderOption option = StreamWorkerCommon.getReaderOption(transId, leftCols, pixelsReader, + hashValue, numPartition); + VectorizedRowBatch rowBatch; + PixelsRecordReader recordReader = pixelsReader.read(option); + // XXX: perhaps do not need to re-initialize the record reader for each hash value. + if (recordReader == null) continue; + // We no longer check the validity of the record reader here, because the record reader + // might not have been initialized yet due to the absence of the stream header. + // checkArgument(recordReader.isValid(), "failed to get record reader"); + + computeCostTimer.start(); + do { - if (pixelsReader != null) + rowBatch = recordReader.readBatch(StreamWorkerCommon.rowBatchSize); + if (rowBatch.size > 0) { - logger.debug("closing pixels reader"); - pixelsReader.close(); + joiner.populateLeftTable(rowBatch); } - } + } while (!rowBatch.endOfFile); + computeCostTimer.stop(); + computeCostTimer.minus(recordReader.getReadTimeNanos()); + readCostTimer.add(recordReader.getReadTimeNanos()); + readBytes += recordReader.getCompletedBytes(); + numReadRequests += recordReader.getNumReadRequests(); } - if (!leftParts.isEmpty()) + } + catch (Throwable e) + { + if (!(e instanceof IOException)) + throw new WorkerException("failed to scan the partitioned file and build the hash table", e); + } + finally + { + if (pixelsReader != null) { - try - { - TimeUnit.MILLISECONDS.sleep(100); - } - catch (InterruptedException e) - { - throw new WorkerException("interrupted while waiting for the partitioned files"); - } + logger.debug("closing pixels reader on port " + StreamWorkerCommon.STREAM_PORT_SMALL_TABLE); + pixelsReader.close(); } } workerMetrics.addReadBytes(readBytes); @@ -508,95 +521,63 @@ protected static void buildHashTable(long transId, Joiner joiner, List l protected static int joinWithRightTable( long transId, Joiner joiner, List rightParts, String[] rightCols, Storage.Scheme rightScheme, List hashValues, int numPartition, ConcurrentLinkedQueue joinResult, - WorkerMetrics workerMetrics) throws IOException + WorkerMetrics workerMetrics, PixelsReader rightPixelsReader) throws IOException { int joinedRows = 0; WorkerMetrics.Timer readCostTimer = new WorkerMetrics.Timer(); WorkerMetrics.Timer computeCostTimer = new WorkerMetrics.Timer(); long readBytes = 0L; int numReadRequests = 0; - while (!rightParts.isEmpty()) + + readCostTimer.start(); + PixelsReader pixelsReader = rightPixelsReader; + try { - for (Iterator it = rightParts.iterator(); it.hasNext(); ) + readCostTimer.stop(); + checkArgument(pixelsReader.isPartitioned(), "pixels file is not partitioned"); + for (int hashValue : hashValues) { - String rightPartitioned = it.next(); - readCostTimer.start(); - PixelsReader pixelsReader = null; - try + PixelsReaderOption option = StreamWorkerCommon.getReaderOption(transId, rightCols, pixelsReader, + hashValue, numPartition); + VectorizedRowBatch rowBatch; + PixelsRecordReader recordReader = pixelsReader.read(option); + // checkArgument(recordReader.isValid(), "failed to get record reader"); + + computeCostTimer.start(); + do { - pixelsReader = StreamWorkerCommon.getReader(rightScheme, "http://localhost:18686/", true, numPartition); - readCostTimer.stop(); - checkArgument(pixelsReader.isPartitioned(), "pixels file is not partitioned"); - Set rightHashValues = new HashSet<>(numPartition); - for (int hashValue = 0; hashValue < numPartition; ++hashValue) - { - rightHashValues.add(hashValue); - } - for (int hashValue : hashValues) + rowBatch = recordReader.readBatch(StreamWorkerCommon.rowBatchSize); + if (rowBatch.size > 0) { - if (!rightHashValues.contains(hashValue)) + List joinedBatches = joiner.join(rowBatch); + for (VectorizedRowBatch joined : joinedBatches) { - continue; - } - PixelsReaderOption option = StreamWorkerCommon.getReaderOption(transId, rightCols, pixelsReader, - hashValue, numPartition); - VectorizedRowBatch rowBatch; - PixelsRecordReader recordReader = pixelsReader.read(option); - checkArgument(recordReader.isValid(), "failed to get record reader"); - - computeCostTimer.start(); - do - { - rowBatch = recordReader.readBatch(StreamWorkerCommon.rowBatchSize); - if (rowBatch.size > 0) + if (!joined.isEmpty()) { - List joinedBatches = joiner.join(rowBatch); - for (VectorizedRowBatch joined : joinedBatches) - { - if (!joined.isEmpty()) - { - joinResult.add(joined); - joinedRows += joined.size; - } - } + joinResult.add(joined); // XXX: Can modify this into PixelsWriter.addRowBatch(), to further exploit the parallelism. + joinedRows += joined.size; } - } while (!rowBatch.endOfFile); - computeCostTimer.stop(); - computeCostTimer.minus(recordReader.getReadTimeNanos()); - readCostTimer.add(recordReader.getReadTimeNanos()); - readBytes += recordReader.getCompletedBytes(); - numReadRequests += recordReader.getNumReadRequests(); - } - it.remove(); - } - catch (Throwable e) - { - if (e instanceof IOException) - { - continue; - } - throw new WorkerException("failed to scan the partitioned file '" + - rightPartitioned + "' and do the join", e); - } - finally - { - if (pixelsReader != null) - { - logger.debug("closing pixels reader"); - pixelsReader.close(); + } } - } + } while (!rowBatch.endOfFile); + computeCostTimer.stop(); + computeCostTimer.minus(recordReader.getReadTimeNanos()); + readCostTimer.add(recordReader.getReadTimeNanos()); + readBytes += recordReader.getCompletedBytes(); + numReadRequests += recordReader.getNumReadRequests(); } - if (!rightParts.isEmpty()) + } + catch (Throwable e) + { + if (!(e instanceof IOException)) + throw new WorkerException("failed to scan the partitioned file and do the join", e); + } + finally + { + if (pixelsReader != null) { - try - { - TimeUnit.MILLISECONDS.sleep(100); - } - catch (InterruptedException e) - { - throw new WorkerException("interrupted while waiting for the partitioned files"); - } + logger.debug("closing pixels reader on port " + StreamWorkerCommon.STREAM_PORT_LARGE_TABLE); + pixelsReader.close(); } } workerMetrics.addReadBytes(readBytes); @@ -624,7 +605,7 @@ protected static int joinWithRightTable( protected static int joinWithRightTableAndPartition( long transId, Joiner joiner, List rightParts, String[] rightCols, Storage.Scheme rightScheme, List hashValues, int numPartition, PartitionInfo postPartitionInfo, - List> partitionResult, WorkerMetrics workerMetrics) throws IOException + List> partitionResult, WorkerMetrics workerMetrics, PixelsReader rightPixelsReader) throws IOException { requireNonNull(postPartitionInfo, "outputPartitionInfo is null"); Partitioner partitioner = new Partitioner(postPartitionInfo.getNumPartition(), @@ -634,93 +615,62 @@ protected static int joinWithRightTableAndPartition( WorkerMetrics.Timer computeCostTimer = new WorkerMetrics.Timer(); long readBytes = 0L; int numReadRequests = 0; - while (!rightParts.isEmpty()) + + readCostTimer.start(); + PixelsReader pixelsReader = rightPixelsReader; + try { - for (Iterator it = rightParts.iterator(); it.hasNext(); ) + readCostTimer.stop(); + checkArgument(pixelsReader.isPartitioned(), "pixels file is not partitioned"); + // XXX: check that the hashValue in row group headers match the hashValue assigned to this worker + for (int hashValue : hashValues) { - String rightPartitioned = it.next(); - readCostTimer.start(); - PixelsReader pixelsReader = null; - try + PixelsReaderOption option = StreamWorkerCommon.getReaderOption(transId, rightCols, pixelsReader, + hashValue, numPartition); + VectorizedRowBatch rowBatch; + PixelsRecordReader recordReader = pixelsReader.read(option); + if (recordReader == null) continue; + // checkArgument(recordReader.isValid(), "failed to get record reader"); + + computeCostTimer.start(); + do { - pixelsReader = StreamWorkerCommon.getReader(rightScheme, "http://localhost:18686/", true, numPartition); - readCostTimer.stop(); - checkArgument(pixelsReader.isPartitioned(), "pixels file is not partitioned"); - Set rightHashValues = new HashSet<>(numPartition); - for (int hashValue = 0; hashValue < numPartition; ++hashValue) - { - rightHashValues.add(hashValue); - } - for (int hashValue : hashValues) + rowBatch = recordReader.readBatch(StreamWorkerCommon.rowBatchSize); + if (rowBatch.size > 0) { - if (!rightHashValues.contains(hashValue)) - { - continue; - } - PixelsReaderOption option = StreamWorkerCommon.getReaderOption(transId, rightCols, pixelsReader, - hashValue, numPartition); - VectorizedRowBatch rowBatch; - PixelsRecordReader recordReader = pixelsReader.read(option); - if (recordReader == null) continue; - checkArgument(recordReader.isValid(), "failed to get record reader"); - - computeCostTimer.start(); - do + List joinedBatches = joiner.join(rowBatch); + for (VectorizedRowBatch joined : joinedBatches) { - rowBatch = recordReader.readBatch(StreamWorkerCommon.rowBatchSize); - if (rowBatch.size > 0) + if (!joined.isEmpty()) { - List joinedBatches = joiner.join(rowBatch); - for (VectorizedRowBatch joined : joinedBatches) + Map parts = partitioner.partition(joined); + for (Map.Entry entry : parts.entrySet()) { - if (!joined.isEmpty()) - { - Map parts = partitioner.partition(joined); - for (Map.Entry entry : parts.entrySet()) - { - partitionResult.get(entry.getKey()).add(entry.getValue()); - } - joinedRows += joined.size; - } + partitionResult.get(entry.getKey()).add(entry.getValue()); } + joinedRows += joined.size; } - } while (!rowBatch.endOfFile); - computeCostTimer.stop(); - computeCostTimer.minus(recordReader.getReadTimeNanos()); - readCostTimer.add(recordReader.getReadTimeNanos()); - readBytes += recordReader.getCompletedBytes(); - numReadRequests += recordReader.getNumReadRequests(); - } - it.remove(); - } - catch (Throwable e) - { - if (e instanceof IOException) - { - continue; - } - throw new WorkerException("failed to scan the partitioned file '" + - rightPartitioned + "' and do the join", e); - } - finally - { - if (pixelsReader != null) - { - logger.debug("closing pixels reader"); - pixelsReader.close(); + } } - } + } while (!rowBatch.endOfFile); + computeCostTimer.stop(); + computeCostTimer.minus(recordReader.getReadTimeNanos()); + readCostTimer.add(recordReader.getReadTimeNanos()); + readBytes += recordReader.getCompletedBytes(); + numReadRequests += recordReader.getNumReadRequests(); } - if (!rightParts.isEmpty()) + } + catch (Throwable e) + { + if (!(e instanceof IOException)) + throw new WorkerException("failed to scan the partitioned file and do the join", e); + } + finally + { + if (pixelsReader != null) { - try - { - TimeUnit.MILLISECONDS.sleep(100); - } - catch (InterruptedException e) - { - throw new WorkerException("interrupted while waiting for the partitioned files"); - } + logger.debug("closing pixels reader on port " + StreamWorkerCommon.STREAM_PORT_LARGE_TABLE); + pixelsReader.close(); } } diff --git a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/StreamWorkerCommon.java b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/StreamWorkerCommon.java index dd6ce3c10..e0d9d4bbf 100644 --- a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/StreamWorkerCommon.java +++ b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/StreamWorkerCommon.java @@ -46,6 +46,9 @@ public class StreamWorkerCommon extends WorkerCommon private static final Logger logger = LogManager.getLogger(StreamWorkerCommon.class); private static final Storage http = null; // placeholder. todo: modularize into a pixels-storage-stream module. + public static final int STREAM_PORT_SMALL_TABLE = 18688; + public static final int STREAM_PORT_LARGE_TABLE = 18686; + public static void initStorage(StorageInfo storageInfo, Boolean isOutput) throws IOException { if (storageInfo.getScheme() == Storage.Scheme.httpstream) diff --git a/pixels-turbo/pixels-worker-vhive/src/main/resources/log4j2.properties b/pixels-turbo/pixels-worker-common/src/main/resources/log4j2.properties similarity index 78% rename from pixels-turbo/pixels-worker-vhive/src/main/resources/log4j2.properties rename to pixels-turbo/pixels-worker-common/src/main/resources/log4j2.properties index fe8abaa4b..61c9e552b 100644 --- a/pixels-turbo/pixels-worker-vhive/src/main/resources/log4j2.properties +++ b/pixels-turbo/pixels-worker-common/src/main/resources/log4j2.properties @@ -1,11 +1,11 @@ -name=pixels-worker-vhive +name=pixels-worker-common status=warn shutdownHook=disable rootLogger.level=info rootLogger.appenderRef.stdout.ref=STDOUT rootLogger.appenderRef.log.ref=log -filter.threshold.type=ThresholdFilter -filter.threshold.level=info +logger.pixelsdb.name=io.pixelsdb.pixels +logger.pixelsdb.level=info appender.console.type=Console appender.console.name=STDOUT appender.console.layout.type=PatternLayout @@ -13,7 +13,7 @@ appender.console.layout.pattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%c]-[%p] %m%n appender.rolling.type=File appender.rolling.name=log appender.rolling.append=true -appender.rolling.fileName=pixels-worker-vhive.log +appender.rolling.fileName=pixels-worker-common.log appender.rolling.layout.type=PatternLayout appender.rolling.layout.pattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%c]-[%p] %m%n diff --git a/pom.xml b/pom.xml index 0db4b7e3a..56a45e2b9 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ pixels-storage/pixels-storage-redis pixels-storage/pixels-storage-localfs pixels-storage/pixels-storage-mock + pixels-storage/pixels-storage-stream pixels-turbo/pixels-worker-common pixels-turbo/pixels-worker-lambda pixels-turbo/pixels-invoker-lambda