Cole Murray
05/07/2022, 3:53 AMorionServer.addContainer('ServiceContainer', {
essential: true,
image: ContainerImage.fromDockerImageAsset(serviceImage),
logging: new AwsLogDriver({
streamPrefix: 'orion-server'
}),
portMappings: [{
containerPort: this.props.orionEnvVars?.port || Number(this.CONTAINER_PORT)
}],
environment: {
PREFECT_ORION_API_HOST: this.props.orionEnvVars?.host || '0.0.0.0',
PREFECT_ORION_API_PORT: this.props.orionEnvVars?.port?.toString() || this.CONTAINER_PORT,
PREFECT_ORION_DATABASE_CONNECTION_URL: `postgres+asyncpg:///${this.props.username}:${ecsSecret.fromSecretsManager(this.props.dbPassword)}@${this.props.databaseHost}/orion`
},
secrets: {
},
memoryLimitMiB: 300,
});
We can overcome this by altering the settings provided, and fetching the username and password separately from the host to build the DB URI, perhaps as a second set of options.
Has anyone found an alternate way to concat the strings together? If not, are we open to adding additional options to distinguish DB_PASSWORD to be injected as secrets?Slackbot
05/07/2022, 6:40 PMLaksh Aithani
05/07/2022, 7:16 PMArtem Vysotsky
05/07/2022, 10:58 PMdatabase upgrade
command does not respect PREFECT_SERVER__HASURA__ADMIN_SECRET
value.Jan Domanski
05/08/2022, 1:30 PM@task
def generate_numbers():
return [1, 2, 3, 4]
@task
def compute_sth_expensive(number):
return number ** 2
@flow
def pipeline():
result_generate_numbers = generate_numbers()
results = map(compute_sth_expensive, result_generate_numbers)
for r in results: r.result() ## ??
Is that an acceptable pattern? I want to do a parallel calculation over the result_generate_numbers
and then perform some gather-like operationArtem Vysotsky
05/08/2022, 3:44 PMimport flow
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import CronSchedule
def create_deployment(name: str,
user_id: str,
job_id: str,
schedule: str):
d = DeploymentSpec(
flow=flow.flow,
name=name,
schedule=CronSchedule(
cron=schedule
),
parameters={
user_id: user_id,
job_id: job_id
}
)
d.create_deployment()
Ievgenii Martynenko
05/08/2022, 6:13 PMwith Flow(name="Flow", executor=executor, storage=storage, run_config=run_config) as flow:
way it can start locally:
with Flow(name="Flow") as local_flow:
flow itself remains the same.Sander
05/08/2022, 6:33 PMSuresh R
05/09/2022, 2:41 AMSeungHyeon Wang
05/09/2022, 3:20 AMIevgenii Martynenko
05/09/2022, 8:50 AMimport os.path as p
from <http://flows.xxx|flows.xxx> import testflow
from prefect.storage import S3
flows = [testflow]
root = p.dirname(p.realpath(__file__))
storage = S3(stored_as_script=True, key='testflow.py', bucket='test')
if __name__ == '__main__':
for flow_file in flows:
flow = flow_file.flow
print(f"Registering flow {flow.name} from {flow_file}")
storage.add_flow(flow)
flow.register(
project_name='test',
idempotency_key=flow.serialized_hash()
)
Flow itself:
name = "testflow"
executor = LocalDaskExecutor()
storage = S3(stored_as_script=True, key='testflow.py', bucket='test')
run_config = KubernetesRun(
job_template_path='<https://XXX/job_template/k8s_job_template.yaml>')
with Flow(name=name, executor=executor, storage=storage, run_config=run_config) as flow:
....
Error I've got is the same when you're not adding storage to DataFlow:
Failed to load and execute flow run: ValueError('Flow is not contained in this Storage')
What am I missing?Nacho Rodriguez
05/09/2022, 8:54 AMFlorian Guily
05/09/2022, 10:08 AMAssaf Ben Shimon
05/09/2022, 11:29 AMprefect.exceptions.AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated. See `prefect auth login --help`.
However when I run prefect auth status
it says I'm connected and authenticated.
Any idea what's wrong?Rafael Afonso Rodrigues
05/09/2022, 12:45 PMArtem Vysotsky
05/09/2022, 12:51 PMflow
directory:
(venv) ➜ src git:(main) ✗ ls -la flow
drwxr-xr-x 6 avysotsky staff 192 May 9 08:47 .
drwxr-xr-x 5 avysotsky staff 160 May 9 08:34 ..
-rw-r--r-- 1 avysotsky staff 3301 May 9 08:47 flow.py
-rw-r--r-- 1 avysotsky staff 870 May 8 10:11 graphql.py
-rw-r--r-- 1 avysotsky staff 611 May 9 08:47 prefect_client.py
And here is how I create the deployment:
d = DeploymentSpec(
flow_location="./flow/flow.py",
name=name,
schedule=CronSchedule(
cron=schedule
),
tags=[
f"user_id:{user_id}",
f"job_id:{job_id}"
],
parameters={
user_id: user_id,
job_id: job_id
}
)
Is Prefect smart enough to pull in all the deps that flow.py needs?Shuchita Tripathi
05/09/2022, 1:39 PMRhys Mansal
05/09/2022, 1:54 PMresource_manager
cleanup
method upstream of another task?Jason
05/09/2022, 2:07 PMMalthe Karbo
05/09/2022, 2:09 PMXavier Babu
05/09/2022, 2:32 PMDylan
05/09/2022, 3:40 PMWilliam Wagner
05/09/2022, 4:54 PM___main___
), is there an in-process Orion orchestrator running alongside it?
2. If 1), is the flow meta-data persisted in a local SQLite instance on disk?
3. Is this "ephemeral" server-less workflow intended for purely testing purposes, or is it production viable?Sumant Agnihotri
05/09/2022, 5:31 PMa()
b()
c()
each of which waits for 2 secs and prints the current time.
Next, I created the following flows:
with Flow("flow-a") as flow_a:
a()
b()
with Flow("parent-flow") as flow:
c()
flow_a.run(executor=LocalDaskExecutor())
flow.run(executor=LocalDaskExecutor())
Here, to my surprise, task a
and b
run first, then after 2 seconds task c
runs.
Example o/p:
b: 23:04:32
a: 23:04:32
c: 23:04:34
I want all 3 to run parallelly, what am I doing wrong? (Sorry, if this is not the right forum to ask these questions.)Jan Domanski
05/09/2022, 6:37 PMChristian Nuss
05/09/2022, 6:52 PMMichelle Brochmann
05/09/2022, 7:45 PMS3Download
, S3List
, and S3Upload
tasks (available in 1.2 from the prefect.tasks.aws.s3
module) available?Jason
05/09/2022, 7:49 PMAlvaro Durán Tovar
05/09/2022, 8:49 PMMike Vanbuskirk
05/09/2022, 9:28 PM