package com.microsoft.azure.management.batchai.implementation;
import com.microsoft.azure.PagedList;
import com.microsoft.azure.management.apigeneration.LangDefinition;
import com.microsoft.azure.management.batchai.AzureBlobFileSystem;
import com.microsoft.azure.management.batchai.AzureBlobFileSystemReference;
import com.microsoft.azure.management.batchai.AzureFileShare;
import com.microsoft.azure.management.batchai.AzureFileShareReference;
import com.microsoft.azure.management.batchai.BatchAICluster;
import com.microsoft.azure.management.batchai.BatchAIExperiment;
import com.microsoft.azure.management.batchai.BatchAIJob;
import com.microsoft.azure.management.batchai.CNTKsettings;
import com.microsoft.azure.management.batchai.Caffe2Settings;
import com.microsoft.azure.management.batchai.CaffeSettings;
import com.microsoft.azure.management.batchai.ChainerSettings;
import com.microsoft.azure.management.batchai.ContainerImageSettings;
import com.microsoft.azure.management.batchai.ContainerSettings;
import com.microsoft.azure.management.batchai.CustomMpiSettings;
import com.microsoft.azure.management.batchai.CustomToolkitSettings;
import com.microsoft.azure.management.batchai.EnvironmentVariable;
import com.microsoft.azure.management.batchai.EnvironmentVariableWithSecretValue;
import com.microsoft.azure.management.batchai.ExecutionState;
import com.microsoft.azure.management.batchai.FileServer;
import com.microsoft.azure.management.batchai.FileServerReference;
import com.microsoft.azure.management.batchai.HorovodSettings;
import com.microsoft.azure.management.batchai.ImageSourceRegistry;
import com.microsoft.azure.management.batchai.InputDirectory;
import com.microsoft.azure.management.batchai.JobCreateParameters;
import com.microsoft.azure.management.batchai.JobPreparation;
import com.microsoft.azure.management.batchai.JobPriority;
import com.microsoft.azure.management.batchai.JobPropertiesConstraints;
import com.microsoft.azure.management.batchai.JobPropertiesExecutionInfo;
import com.microsoft.azure.management.batchai.JobsListOutputFilesOptions;
import com.microsoft.azure.management.batchai.KeyVaultSecretReference;
import com.microsoft.azure.management.batchai.MountVolumes;
import com.microsoft.azure.management.batchai.OutputDirectory;
import com.microsoft.azure.management.batchai.OutputDirectorySettings;
import com.microsoft.azure.management.batchai.OutputFile;
import com.microsoft.azure.management.batchai.ProvisioningState;
import com.microsoft.azure.management.batchai.PyTorchSettings;
import com.microsoft.azure.management.batchai.RemoteLoginInformation;
import com.microsoft.azure.management.batchai.ResourceId;
import com.microsoft.azure.management.batchai.TensorFlowSettings;
import com.microsoft.azure.management.batchai.ToolType;
import com.microsoft.azure.management.batchai.ToolTypeSettings;
import com.microsoft.azure.management.batchai.UnmanagedFileSystemReference;
import com.microsoft.azure.management.batchai.BatchAIWorkspace;
import com.microsoft.azure.management.batchai.model.HasMountVolumes;
import com.microsoft.azure.management.resources.fluentcore.arm.collection.implementation.ReadableWrappersImpl;
import com.microsoft.azure.management.resources.fluentcore.model.implementation.CreatableUpdatableImpl;
import com.microsoft.azure.management.resources.fluentcore.utils.PagedListConverter;
import com.microsoft.azure.management.resources.fluentcore.utils.Utils;
import org.joda.time.DateTime;
import rx.Completable;
import rx.Observable;
import rx.functions.Func1;
import java.util.ArrayList;
import java.util.List;
@LangDefinition
class BatchAIJobImpl
extends CreatableUpdatableImpl<
BatchAIJob,
JobInner,
BatchAIJobImpl>
implements BatchAIJob,
BatchAIJob.Definition,
HasMountVolumes {
private final BatchAIWorkspace workspace;
private final BatchAIExperiment experiment;
private JobCreateParameters createParameters = new JobCreateParameters();
BatchAIJobImpl(String name,
BatchAIExperimentImpl parent,
JobInner inner) {
super(name, inner);
this.workspace = parent.workspace();
this.experiment = parent;
}
@Override
protected Observable<JobInner> getInnerAsync() {
return workspace.manager().inner().jobs().getAsync(workspace.resourceGroupName(), workspace.name(), experiment.name(), this.name());
}
@Override
public boolean isInCreateMode() {
return inner().id() == null;
}
@Override
public Observable<BatchAIJob> createResourceAsync() {
return workspace.manager().inner().jobs().createAsync(
workspace.resourceGroupName(), workspace.name(), experiment.name(), this.name(), createParameters)
.map(new Func1<JobInner, BatchAIJob>() {
@Override
public BatchAIJob call(JobInner innerModel) {
BatchAIJobImpl.this.setInner(innerModel);
return BatchAIJobImpl.this;
}
});
}
@Override
public BatchAIJobImpl withStdOutErrPathPrefix(String stdOutErrPathPrefix) {
createParameters.withStdOutErrPathPrefix(stdOutErrPathPrefix);
return this;
}
@Override
public BatchAIJobImpl withNodeCount(int nodeCount) {
createParameters.withNodeCount(nodeCount);
return this;
}
@Override
public BatchAIJobImpl withJobPreparationCommandLine(String commandLine) {
createParameters.withJobPreparation(new JobPreparation().withCommandLine(commandLine));
return this;
}
@Override
public BatchAIJobImpl withInputDirectory(String id, String path) {
ensureInputDirectories().add(new InputDirectory().withId(id).withPath(path));
return this;
}
private List<InputDirectory> ensureInputDirectories() {
if (createParameters.inputDirectories() == null) {
createParameters.withInputDirectories(new ArrayList<InputDirectory>());
}
return createParameters.inputDirectories();
}
@Override
public BatchAIJobImpl withOutputDirectory(String id, String pathPrefix) {
if (createParameters.outputDirectories() == null) {
createParameters.withOutputDirectories(new ArrayList<OutputDirectory>());
}
createParameters.outputDirectories().add(new OutputDirectory().withId(id).withPathPrefix(pathPrefix));
return this;
}
@Override
public OutputDirectorySettings.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineOutputDirectory(String id) {
return new OutputDirectorySettingsImpl(new OutputDirectory().withId(id), this);
}
@Override
public BatchAIJobImpl withContainerImage(String image) {
if (ensureContainerSettings().imageSourceRegistry() == null) {
createParameters.containerSettings().withImageSourceRegistry(new ImageSourceRegistry());
}
createParameters.containerSettings().imageSourceRegistry().withImage(image);
return this;
}
@Override
public ContainerImageSettings.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineContainerSettings(String image) {
return new ContainerImageSettingsImpl(new ContainerSettings().withImageSourceRegistry(new ImageSourceRegistry().withImage(image)), this);
}
private ContainerSettings ensureContainerSettings() {
if (createParameters.containerSettings() == null) {
createParameters.withContainerSettings(new ContainerSettings());
}
return createParameters.containerSettings();
}
@Override
public ToolTypeSettings.CognitiveToolkit.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineCognitiveToolkit() {
return new CognitiveToolkitImpl(new CNTKsettings(), this);
}
@Override
public ToolTypeSettings.TensorFlow.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineTensorflow() {
return new TensorFlowImpl(new TensorFlowSettings(), this);
}
@Override
public ToolTypeSettings.Caffe.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineCaffe() {
return new CaffeImpl(new CaffeSettings(), this);
}
@Override
public ToolTypeSettings.Caffe2.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineCaffe2() {
return new Caffe2Impl(new Caffe2Settings(), this);
}
@Override
public ToolTypeSettings.Chainer.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineChainer() {
return new ChainerImpl(new ChainerSettings(), this);
}
@Override
public ToolTypeSettings.PyTorch.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> definePyTorch() {
return new PyTorchImpl(new PyTorchSettings(), this);
}
@Override
public ToolTypeSettings.CustomMpi.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineCustomMpi() {
return new CustomMpiImpl(new CustomMpiSettings(), this);
}
@Override
public ToolTypeSettings.Horovod.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineHorovod() {
return new HorovodImpl(new HorovodSettings(), this);
}
@Override
public ToolTypeSettings.CustomToolkit.DefinitionStages.Blank<BatchAIJob.DefinitionStages.WithCreate> defineCustomToolkit() {
return new CustomToolkitImpl(new CustomToolkitSettings(), this);
}
@Override
public BatchAIJobImpl withEnvironmentVariable(String name, String value) {
ensureEnvironmentVariables().add(new EnvironmentVariable().withName(name).withValue(value));
return this;
}
@Override
public BatchAIJobImpl withEnvironmentVariableSecretValue(String name, String value) {
ensureEnvironmentVariablesWithSecrets().add(new EnvironmentVariableWithSecretValue().withName(name).withValue(value));
return this;
}
@Override
public BatchAIJobImpl withEnvironmentVariableSecretValue(String name, String keyVaultId, String secretUrl) {
KeyVaultSecretReference secretReference = new KeyVaultSecretReference()
.withSourceVault(new ResourceId().withId(keyVaultId)).withSecretUrl(secretUrl);
ensureEnvironmentVariablesWithSecrets().add(new EnvironmentVariableWithSecretValue().withName(name).withValueSecretReference(secretReference));
return this;
}
@Override
public BatchAIJobImpl withExistingCluster(BatchAICluster cluster) {
createParameters.withCluster(new ResourceId().withId(cluster.id()));
return this;
}
@Override
public BatchAIJobImpl withExistingClusterId(String clusterId) {
createParameters.withCluster(new ResourceId().withId(clusterId));
return this;
}
@Override
public AzureFileShareImpl defineAzureFileShare() {
return new AzureFileShareImpl<BatchAIJob.DefinitionStages.WithCreate>(new AzureFileShareReference(), this);
}
@Override
public AzureBlobFileSystemImpl defineAzureBlobFileSystem() {
return new AzureBlobFileSystemImpl(new AzureBlobFileSystemReference(), this);
}
@Override
public FileServerImpl defineFileServer() {
return new FileServerImpl<BatchAIJob.DefinitionStages.WithCreate>(new FileServerReference(), this);
}
@Override
public BatchAIJobImpl withUnmanagedFileSystem(String mountCommand, String relativeMountPath) {
MountVolumes mountVolumes = ensureMountVolumes();
if (mountVolumes.unmanagedFileSystems() == null) {
mountVolumes.withUnmanagedFileSystems(new ArrayList<UnmanagedFileSystemReference>());
}
mountVolumes.unmanagedFileSystems().add(new UnmanagedFileSystemReference().withMountCommand(mountCommand).withRelativeMountPath(relativeMountPath));
return this;
}
@Override
public void attachAzureFileShare(AzureFileShare azureFileShare) {
MountVolumes mountVolumes = ensureMountVolumes();
if (mountVolumes.azureFileShares() == null) {
mountVolumes.withAzureFileShares(new ArrayList<AzureFileShareReference>());
}
mountVolumes.azureFileShares().add(azureFileShare.inner());
}
@Override
public void attachAzureBlobFileSystem(AzureBlobFileSystem azureBlobFileSystem) {
MountVolumes mountVolumes = ensureMountVolumes();
if (mountVolumes.azureBlobFileSystems() == null) {
mountVolumes.withAzureBlobFileSystems(new ArrayList<AzureBlobFileSystemReference>());
}
mountVolumes.azureBlobFileSystems().add(azureBlobFileSystem.inner());
}
@Override
public void attachFileServer(FileServer fileServer) {
MountVolumes mountVolumes = ensureMountVolumes();
if (mountVolumes.fileServers() == null) {
mountVolumes.withFileServers(new ArrayList<FileServerReference>());
}
mountVolumes.fileServers().add(fileServer.inner());
}
private MountVolumes ensureMountVolumes() {
if (createParameters.mountVolumes() == null) {
createParameters.withMountVolumes(new MountVolumes());
}
return createParameters.mountVolumes();
}
private List<EnvironmentVariableWithSecretValue> ensureEnvironmentVariablesWithSecrets() {
if (createParameters.secrets() == null) {
createParameters.withSecrets(new ArrayList<EnvironmentVariableWithSecretValue>());
}
return createParameters.secrets();
}
private List<EnvironmentVariable> ensureEnvironmentVariables() {
if (createParameters.environmentVariables() == null) {
createParameters.withEnvironmentVariables(new ArrayList<EnvironmentVariable>());
}
return createParameters.environmentVariables();
}
void attachCntkSettings(CognitiveToolkitImpl cognitiveToolkit) {
createParameters.withCntkSettings(cognitiveToolkit.inner());
}
void attachTensorFlowSettings(TensorFlowImpl tensorFlow) {
createParameters.withTensorFlowSettings(tensorFlow.inner());
}
void attachCaffeSettings(CaffeImpl caffe) {
createParameters.withCaffeSettings(caffe.inner());
}
void attachCaffe2Settings(Caffe2Impl caffe2) {
createParameters.withCaffe2Settings(caffe2.inner());
}
void attachChainerSettings(ChainerImpl chainer) {
createParameters.withChainerSettings(chainer.inner());
}
void attachPyTorchSettings(PyTorchImpl pyTorch) {
createParameters.withPyTorchSettings(pyTorch.inner());
}
void attachCustomMpiSettings(CustomMpiImpl customMpi) {
createParameters.withCustomMpiSettings(customMpi.inner());
}
void attachHorovodSettings(HorovodImpl horovod) {
createParameters.withHorovodSettings(horovod.inner());
}
void attachCustomToolkitSettings(CustomToolkitImpl customToolkit) {
createParameters.withCustomToolkitSettings(customToolkit.inner());
}
void attachOutputDirectory(OutputDirectorySettingsImpl outputDirectorySettings) {
if (createParameters.outputDirectories() == null) {
createParameters.withOutputDirectories(new ArrayList<OutputDirectory>());
}
createParameters.outputDirectories().add(outputDirectorySettings.inner());
}
void attachContainerSettings(ContainerImageSettingsImpl containerImageSettings) {
createParameters.withContainerSettings(containerImageSettings.inner());
}
@Override
public void terminate() {
terminateAsync().await();
}
@Override
public Completable terminateAsync() {
Observable<Void> stopObservable = workspace.manager().inner().jobs()
.terminateAsync(workspace.resourceGroupName(), workspace.name(), experiment.name(), this.name());
Observable<BatchAIJob> refreshObservable = refreshAsync();
return Observable.concat(stopObservable, refreshObservable).toCompletable();
}
@Override
public PagedList<OutputFile> listFiles(String outputDirectoryId) {
PagedListConverter<FileInner, OutputFile> converter = new PagedListConverter<FileInner, OutputFile>() {
@Override
public Observable<OutputFile> typeConvertAsync(FileInner fileInner) {
return Observable.just((OutputFile) new OutputFileImpl(fileInner));
}
};
return converter.convert(workspace.manager().inner().jobs()
.listOutputFiles(workspace.resourceGroupName(), workspace.name(), experiment.name(), name(), new JobsListOutputFilesOptions().withOutputdirectoryid(outputDirectoryId)));
}
@Override
public Observable<OutputFile> listFilesAsync(String outputDirectoryId) {
return ReadableWrappersImpl.convertPageToInnerAsync(workspace.manager().inner().jobs()
.listOutputFilesAsync(workspace.resourceGroupName(), workspace.name(), experiment.name(), name(),
new JobsListOutputFilesOptions().withOutputdirectoryid(outputDirectoryId))).map(new Func1<FileInner, OutputFile>() {
@Override
public OutputFile call(FileInner fileInner) {
return new OutputFileImpl(fileInner);
}
});
}
@Override
public PagedList<OutputFile> listFiles(String outputDirectoryId, String directory, Integer linkExpiryMinutes, Integer maxResults) {
PagedListConverter<FileInner, OutputFile> converter = new PagedListConverter<FileInner, OutputFile>() {
@Override
public Observable<OutputFile> typeConvertAsync(FileInner fileInner) {
return Observable.just((OutputFile) new OutputFileImpl(fileInner));
}
};
return converter.convert(workspace.manager().inner().jobs()
.listOutputFiles(workspace.resourceGroupName(), workspace.name(), experiment.name(), name(),
new JobsListOutputFilesOptions().withOutputdirectoryid(outputDirectoryId)
.withDirectory(directory)
.withLinkexpiryinminutes(linkExpiryMinutes)
.withMaxResults(maxResults)));
}
@Override
public Observable<OutputFile> listFilesAsync(String outputDirectoryId, String directory, Integer linkExpiryMinutes, Integer maxResults) {
return ReadableWrappersImpl.convertPageToInnerAsync(workspace.manager().inner().jobs()
.listOutputFilesAsync(workspace.resourceGroupName(), workspace.name(), experiment.name(), name(),
new JobsListOutputFilesOptions().withOutputdirectoryid(outputDirectoryId)
.withDirectory(directory)
.withLinkexpiryinminutes(linkExpiryMinutes)
.withMaxResults(maxResults)))
.map(new Func1<FileInner, OutputFile>() {
@Override
public OutputFile call(FileInner fileInner) {
return new OutputFileImpl(fileInner);
}
});
}
@Override
public PagedList<RemoteLoginInformation> listRemoteLoginInformation() {
PagedListConverter<RemoteLoginInformationInner, RemoteLoginInformation> converter = new PagedListConverter<RemoteLoginInformationInner, RemoteLoginInformation>() {
@Override
public Observable<RemoteLoginInformation> typeConvertAsync(RemoteLoginInformationInner inner) {
return Observable.just((RemoteLoginInformation) new RemoteLoginInformationImpl(inner));
}
};
return converter.convert(workspace.manager().inner().jobs()
.listRemoteLoginInformation(workspace.resourceGroupName(), workspace.name(), experiment.name(), name()));
}
@Override
public Observable<RemoteLoginInformation> listRemoteLoginInformationAsync() {
return ReadableWrappersImpl.convertPageToInnerAsync(workspace.manager().inner().jobs()
.listRemoteLoginInformationAsync(workspace.resourceGroupName(), workspace.name(), experiment.name(), name()))
.map(new Func1<RemoteLoginInformationInner, RemoteLoginInformation>() {
@Override
public RemoteLoginInformation call(RemoteLoginInformationInner remoteLoginInformationInner) {
return new RemoteLoginInformationImpl(remoteLoginInformationInner);
}
});
}
@Override
public BatchAIExperiment experiment() {
return experiment;
}
@Override
public JobPriority schedulingPriority() {
return inner().schedulingPriority();
}
@Override
public ResourceId cluster() {
return inner().cluster();
}
@Override
public MountVolumes mountVolumes() {
return inner().mountVolumes();
}
@Override
public String jobOutputDirectoryPathSegment() {
return inner().jobOutputDirectoryPathSegment();
}
@Override
public int nodeCount() {
return Utils.toPrimitiveInt(inner().nodeCount());
}
@Override
public ContainerSettings containerSettings() {
return inner().containerSettings();
}
@Override
public ToolType toolType() {
return inner().toolType();
}
@Override
public CNTKsettings cntkSettings() {
return inner().cntkSettings();
}
@Override
public PyTorchSettings pyTorchSettings() {
return inner().pyTorchSettings();
}
@Override
public TensorFlowSettings tensorFlowSettings() {
return inner().tensorFlowSettings();
}
@Override
public CaffeSettings caffeSettings() {
return inner().caffeSettings();
}
@Override
public ChainerSettings chainerSettings() {
return inner().chainerSettings();
}
@Override
public CustomToolkitSettings customToolkitSettings() {
return inner().customToolkitSettings();
}
@Override
public JobPreparation jobPreparation() {
return inner().jobPreparation();
}
@Override
public String stdOutErrPathPrefix() {
return inner().stdOutErrPathPrefix();
}
@Override
public List<InputDirectory> inputDirectories() {
return inner().inputDirectories();
}
@Override
public List<OutputDirectory> outputDirectories() {
return inner().outputDirectories();
}
@Override
public List<EnvironmentVariable> environmentVariables() {
return inner().environmentVariables();
}
@Override
public List<EnvironmentVariableWithSecretValue> secrets() {
return inner().secrets();
}
@Override
public JobPropertiesConstraints constraints() {
return inner().constraints();
}
@Override
public DateTime creationTime() {
return inner().creationTime();
}
@Override
public ProvisioningState provisioningState() {
return inner().provisioningState();
}
@Override
public DateTime provisioningStateTransitionTime() {
return inner().provisioningStateTransitionTime();
}
@Override
public ExecutionState executionState() {
return inner().executionState();
}
@Override
public DateTime executionStateTransitionTime() {
return inner().executionStateTransitionTime();
}
@Override
public JobPropertiesExecutionInfo executionInfo() {
return inner().executionInfo();
}
@Override
public String id() {
return inner().id();
}
}