Zachary Loertscher
03/15/2023, 1:50 PMairbyte_connection_ids
into the flow.
I want to reference an externally stored dictionary (i.e. a .json
file stored in S3) so that it's easier to change when we redeploy Airbyte connections & get new airbyte_connection_ids
.
Is this possible? Can a Prefect parameter take values from an external source? We are running on prefect version 1.2.4Noam Banay
03/15/2023, 1:57 PMget_directory
function) and execute the code with flow decorator. For example:
from prefect import flow
@flow
def example:():
with open("my/code/path.py") as f:
exec(f.read())
I want to know if this is the best practice to do it and also how can I make sure the code that I’m migrating from the other repository is up-to-date before the flow start to run
ThanksRikimaru Yamaguchi
03/15/2023, 2:37 PMsecrets
as follows.
"secrets": [
{
"name": "AWS_ACCESS_KEY_ID",
"valueFrom": "/copilot/app/prd/secrets/AWS_ACCESS_KEY_ID"
},
]
Thank you.Malavika S Menon
03/15/2023, 4:47 PMAndrew
03/15/2023, 5:19 PMLeon Kozlowski
03/15/2023, 5:48 PMJarvis Stubblefield
03/15/2023, 6:39 PMPaco Ibañez
03/15/2023, 6:51 PMCharles Leung
03/15/2023, 10:34 PMasync def uploadJsontoS3(path):
s3_bucket = await S3Bucket.load("my_bucket")
with open(r"\\filepathtoJson", "rb") as f:
s3_bucket.upload_from_file_object(f, "aws-batch-args/testArgs.json")
Albert Wong
03/16/2023, 1:01 AMMohammad Kaif Rizvi
03/16/2023, 4:38 AMfrom prefect import task
from google.cloud import dataproc_v1beta2 as dataproc
@task
_def_ submit_pyspark_job():
# Create a client to interact with the Dataproc API
client = dataproc.JobControllerClient()
# Set the Dataproc cluster and job parameters
project_id = "<project-id>"
region = "<region>"
cluster_name = "<cluster-name>"
main_python_file_uri = "<main-python-file-uri>"
args = ["<args>"]
# Create the job request
job = {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {
"main_python_file_uri": main_python_file_uri,
"args": args
}
}
# Submit the job to the Dataproc cluster
result = client.submit_job_as_operation(
_request_={
"project_id": project_id,
"region": region,
"job": job
}
)
# Get the ID of the submitted job
job_id = result.name.split("/")[-1]
print(_f_"Submitted PySpark job with ID: {job_id}")
Stephen Lloyd
03/16/2023, 7:38 AM.submit()
. I want to make a bunch of api calls in parallel and build a list of results. Can anyone help me on this?Samuel Bunce
03/16/2023, 7:57 AMWellington Braga
03/16/2023, 11:23 AMNataliia Sherstneva
03/16/2023, 3:24 PMdef upload_dir_to_s3(local_model_dir, logger, target_model_dir=None):
s3_bucket = S3Bucket.load(BLOCK_NAME_IRS_TRAIN_DATA_S3)
if not target_model_dir:
target_model_dir = local_model_dir
try:
s3_bucket.upload_from_folder(from_folder=local_model_dir, to_folder=f"{target_model_dir}")
<http://logger.info|logger.info>(f"Success! Directory '{local_model_dir}' was uploaded to '{s3_bucket.bucket_name}' Bucket.")
except Exception:
raise IOError(f"Saving to '{s3_bucket.bucket_name}' Bucket Failed!")
Then I download this model from S3 Bucket, however not within Prefect Workflow, but separately with boto3.client
. Locally it works, but in K8s Cluster I get an Forbidden Error.
botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden
Could you please tell me, what the problem might be? Should I save or read the model in another way?Guy Altman
03/16/2023, 4:37 PMKyle Austin
03/16/2023, 5:31 PMfrom prefect import task, flow, get_run_logger
from logging import Logger
@task()
def pop_element_zero(ls: list, logger: Logger) -> list:
val = ls.pop(0)
<http://logger.info|logger.info>(f'Just popped {val} from the list {ls}')
return val
@flow()
def pop_task_flow():
logger = get_run_logger()
ls = [1,2,3]
while ls:
pop_element_zero(ls=ls, logger=logger)
if __name__ == "__main__":
pop_task_flow()
It makes sense (really good for making tasks retry-proof when the inputs are immutable objects). But I have some flows where I would like to not have this behavior. Is there a task decorator param that I am missing that would allow the user to let the task work directly on its inputs and not copies?kiran
03/16/2023, 5:43 PMShaoyi Zhang
03/16/2023, 6:12 PMTask run 'xxxxx' already finished.
... exception stack trace ...
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
If I manually set all task runs in the flow run to failed, then the flow retry will succeed. Is this a bug or a feature? My expectation is when we hit the retry button for flow runs, the state of task runs should get reset. Am I missing something? Thank you!Adam
03/16/2023, 6:24 PMMansour Zayer
03/16/2023, 7:03 PMRon Bergeron
03/16/2023, 7:25 PMprefect profile create dev
then I tried to use the profile by issuing the following command and received the errors below.
prefect profile use dev
Error screen snip:Paco Ibañez
03/16/2023, 7:43 PMAustin Weisgrau
03/16/2023, 7:50 PMprefect block register
sometimes raises alembic.script.revision.ResolutionError: No such revision or branch 'XXXX'
. Anyone know what causes this and how to resolve it?Aiden Price
03/17/2023, 1:33 AMmy_block.save("my-block-slug")
Nimesh Kumar
03/17/2023, 5:19 AMMohammad Kaif Rizvi
03/17/2023, 6:04 AMShivam Yadav
03/17/2023, 8:57 AMimport prefect
from prefect import flow, task
import json
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
import scripts
from prefect.blocks.system import Secret, String
from prefect_databricks import DatabricksCredentials
from prefect.blocks.system import Secret, String
@flow(name='Databricks-Flow')
def databricks_flow():
secret_block = Secret.load("prd-databricks-token")
print(secret_block)
if __name__ == "__main__":
databricks_flow()
While executing this i am getting below error:
Can anyone please help me in resolving this.
FYI: This Piece of code is perfectly working in my friends system.
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '403 Forbidden' for url '<https://api.prefect.cloud/api/accounts/470d1ece-3f38-482c-8641-2b9756b4781e/workspaces/b7f3ff33-c0b3-4c1e-b1a3-794ef4f674b1/flows/>'
Response: {'detail': 'Forbidden'}
For more information check: <https://httpstatuses.com/403>
Clovis
03/17/2023, 9:06 AMclass ModelB(pydantic.BaseModel):
mandatory: string
class ModelA(pydantic.BaseModel):
some_values: typing.Any
b: ModelB | None = None
In some case, I don’t need to instantiate ModelB
and to do so, I just don’t add it to the deployment’s yaml. This was working fine all along, but this morning the UI prevents me to run a deployment Quick Run
as my field ModelB.mandatory
is missing (even if I did not specify ModelB
in ModelA
in the yaml).
Did something change and I have now to change my model parameters ?Rafał Bielicki
03/17/2023, 9:28 AM@task(persist_result=True)
def task():
...
@flow(persist_result=True)
def flow():
task()
As you can see the task is called synchronously and I don’t get any results back.
But I keep getting.
State message: Flow run encountered an exception. MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.